1#![expect(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5pub type DeleteSpec = crate::db::query::plan::DeleteSpec;
7pub type LoadSpec = crate::db::query::plan::LoadSpec;
8pub type QueryMode = crate::db::query::plan::QueryMode;
9
10use crate::{
11 db::{
12 access::{AccessPath, AccessPlan, AccessPlanError},
13 contracts::{ReadConsistency, SchemaInfo},
14 executor::ExecutablePlan,
15 policy,
16 query::{
17 explain::ExplainPlan,
18 expr::{FilterExpr, SortExpr, SortLowerError},
19 plan::{
20 AccessPlannedQuery, DeleteLimitSpec, LogicalPlan, OrderDirection, OrderSpec,
21 PageSpec, PlanError, canonical,
22 planner::{PlannerError, plan_access},
23 validate::validate_query_semantics,
24 },
25 predicate::{
26 Predicate, ValidateError, normalize, normalize_enum_literals,
27 validate::reject_unsupported_query_features,
28 },
29 },
30 response::ResponseError,
31 },
32 error::InternalError,
33 model::entity::EntityModel,
34 traits::{EntityKind, FieldValue, SingletonEntity},
35 value::Value,
36};
37use std::marker::PhantomData;
38use thiserror::Error as ThisError;
39
40#[derive(Clone, Debug, Eq, PartialEq)]
46pub(crate) enum KeyAccess<K> {
47 Single(K),
48 Many(Vec<K>),
49}
50
51#[derive(Clone, Copy, Debug, Eq, PartialEq)]
57pub(crate) enum KeyAccessKind {
58 Single,
59 Many,
60 Only,
61}
62
63#[derive(Clone, Debug, Eq, PartialEq)]
69pub(crate) struct KeyAccessState<K> {
70 pub kind: KeyAccessKind,
71 pub access: KeyAccess<K>,
72}
73
74pub(crate) fn access_plan_from_keys_value<K>(access: &KeyAccess<K>) -> AccessPlan<Value>
76where
77 K: FieldValue,
78{
79 match access {
80 KeyAccess::Single(key) => AccessPlan::path(AccessPath::ByKey(key.to_value())),
81 KeyAccess::Many(keys) => {
82 let mut values: Vec<Value> = keys.iter().map(FieldValue::to_value).collect();
83 canonical::canonicalize_key_values(&mut values);
84 if let Some(first) = values.first()
85 && values.len() == 1
86 {
87 return AccessPlan::path(AccessPath::ByKey(first.clone()));
88 }
89
90 AccessPlan::path(AccessPath::ByKeys(values))
91 }
92 }
93}
94
95pub(crate) fn access_plan_to_entity_keys<E: EntityKind>(
97 model: &EntityModel,
98 access: AccessPlan<Value>,
99) -> Result<AccessPlan<E::Key>, PlanError> {
100 access.into_executable::<E>(model)
101}
102
103pub(crate) fn coerce_entity_key<E: EntityKind>(
105 model: &EntityModel,
106 key: &Value,
107) -> Result<E::Key, PlanError> {
108 E::Key::from_value(key).ok_or_else(|| {
109 PlanError::from(AccessPlanError::PrimaryKeyMismatch {
110 field: model.primary_key.name.to_string(),
111 key: key.clone(),
112 })
113 })
114}
115
116impl AccessPlan<Value> {
117 pub(crate) fn into_executable<E: EntityKind>(
119 self,
120 model: &EntityModel,
121 ) -> Result<AccessPlan<E::Key>, PlanError> {
122 match self {
123 Self::Path(path) => Ok(AccessPlan::path(path.into_executable::<E>(model)?)),
124 Self::Union(children) => {
125 let mut out = Vec::with_capacity(children.len());
126 for child in children {
127 out.push(child.into_executable::<E>(model)?);
128 }
129
130 Ok(AccessPlan::Union(out))
131 }
132 Self::Intersection(children) => {
133 let mut out = Vec::with_capacity(children.len());
134 for child in children {
135 out.push(child.into_executable::<E>(model)?);
136 }
137
138 Ok(AccessPlan::Intersection(out))
139 }
140 }
141 }
142}
143
144impl AccessPath<Value> {
145 pub(crate) fn into_executable<E: EntityKind>(
147 self,
148 model: &EntityModel,
149 ) -> Result<AccessPath<E::Key>, PlanError> {
150 match self {
151 Self::ByKey(key) => Ok(AccessPath::ByKey(coerce_entity_key::<E>(model, &key)?)),
152 Self::ByKeys(keys) => {
153 let mut out = Vec::with_capacity(keys.len());
154 for key in keys {
155 out.push(coerce_entity_key::<E>(model, &key)?);
156 }
157
158 Ok(AccessPath::ByKeys(out))
159 }
160 Self::KeyRange { start, end } => Ok(AccessPath::KeyRange {
161 start: coerce_entity_key::<E>(model, &start)?,
162 end: coerce_entity_key::<E>(model, &end)?,
163 }),
164 Self::IndexPrefix { index, values } => Ok(AccessPath::IndexPrefix { index, values }),
165 Self::IndexRange { spec } => Ok(AccessPath::IndexRange { spec }),
166 Self::FullScan => Ok(AccessPath::FullScan),
167 }
168 }
169}
170
171#[derive(Debug)]
179pub(crate) struct QueryModel<'m, K> {
180 model: &'m EntityModel,
181 mode: QueryMode,
182 predicate: Option<Predicate>,
183 key_access: Option<KeyAccessState<K>>,
184 key_access_conflict: bool,
185 order: Option<OrderSpec>,
186 distinct: bool,
187 consistency: ReadConsistency,
188}
189
190impl<'m, K: FieldValue> QueryModel<'m, K> {
191 #[must_use]
192 pub(crate) const fn new(model: &'m EntityModel, consistency: ReadConsistency) -> Self {
193 Self {
194 model,
195 mode: QueryMode::Load(LoadSpec::new()),
196 predicate: None,
197 key_access: None,
198 key_access_conflict: false,
199 order: None,
200 distinct: false,
201 consistency,
202 }
203 }
204
205 #[must_use]
207 pub(crate) const fn mode(&self) -> QueryMode {
208 self.mode
209 }
210
211 #[must_use]
212 fn has_explicit_order(&self) -> bool {
213 policy::has_explicit_order(self.order.as_ref())
214 }
215
216 #[must_use]
217 const fn load_spec(&self) -> Option<LoadSpec> {
218 match self.mode {
219 QueryMode::Load(spec) => Some(spec),
220 QueryMode::Delete(_) => None,
221 }
222 }
223
224 #[must_use]
226 pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
227 self.predicate = match self.predicate.take() {
228 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
229 None => Some(predicate),
230 };
231 self
232 }
233
234 pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
236 let schema = SchemaInfo::from_entity_model(self.model)?;
237 let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
238
239 Ok(self.filter(predicate))
240 }
241
242 pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
244 let schema = SchemaInfo::from_entity_model(self.model)?;
245 let order = match expr.lower_with(&schema) {
246 Ok(order) => order,
247 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
248 Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
249 };
250
251 policy::validate_order_shape(Some(&order))
252 .map_err(IntentError::from)
253 .map_err(QueryError::from)?;
254
255 Ok(self.order_spec(order))
256 }
257
258 #[must_use]
260 pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
261 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
262 self
263 }
264
265 #[must_use]
267 pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
268 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
269 self
270 }
271
272 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
274 self.order = Some(order);
275 self
276 }
277
278 #[must_use]
280 pub(crate) const fn distinct(mut self) -> Self {
281 self.distinct = true;
282 self
283 }
284
285 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
287 if let Some(existing) = &self.key_access
288 && existing.kind != kind
289 {
290 self.key_access_conflict = true;
291 }
292
293 self.key_access = Some(KeyAccessState { kind, access });
294
295 self
296 }
297
298 pub(crate) fn by_id(self, id: K) -> Self {
300 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
301 }
302
303 pub(crate) fn by_ids<I>(self, ids: I) -> Self
305 where
306 I: IntoIterator<Item = K>,
307 {
308 self.set_key_access(
309 KeyAccessKind::Many,
310 KeyAccess::Many(ids.into_iter().collect()),
311 )
312 }
313
314 pub(crate) fn only(self, id: K) -> Self {
316 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
317 }
318
319 #[must_use]
321 pub(crate) const fn delete(mut self) -> Self {
322 if self.mode.is_load() {
323 self.mode = QueryMode::Delete(DeleteSpec::new());
324 }
325 self
326 }
327
328 #[must_use]
332 pub(crate) const fn limit(mut self, limit: u32) -> Self {
333 match self.mode {
334 QueryMode::Load(mut spec) => {
335 spec.limit = Some(limit);
336 self.mode = QueryMode::Load(spec);
337 }
338 QueryMode::Delete(mut spec) => {
339 spec.limit = Some(limit);
340 self.mode = QueryMode::Delete(spec);
341 }
342 }
343 self
344 }
345
346 #[must_use]
348 pub(crate) const fn offset(mut self, offset: u32) -> Self {
349 if let QueryMode::Load(mut spec) = self.mode {
350 spec.offset = offset;
351 self.mode = QueryMode::Load(spec);
352 }
353 self
354 }
355
356 fn build_plan_model(&self) -> Result<AccessPlannedQuery<Value>, QueryError> {
358 let schema_info = SchemaInfo::from_entity_model(self.model)?;
360 self.validate_intent()?;
361
362 let normalized_predicate = self
364 .predicate
365 .as_ref()
366 .map(|predicate| {
367 reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
368 let predicate = normalize_enum_literals(&schema_info, predicate)?;
369 Ok::<Predicate, ValidateError>(normalize(&predicate))
370 })
371 .transpose()?;
372 let access_plan_value = match &self.key_access {
373 Some(state) => access_plan_from_keys_value(&state.access),
374 None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
375 };
376
377 let logical = LogicalPlan {
379 mode: self.mode,
380 predicate: normalized_predicate,
381 order: canonicalize_order_spec(self.model, self.order.clone()),
384 distinct: self.distinct,
385 delete_limit: match self.mode {
386 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
387 QueryMode::Load(_) => None,
388 },
389 page: match self.mode {
390 QueryMode::Load(spec) => {
391 if spec.limit.is_some() || spec.offset > 0 {
392 Some(PageSpec {
393 limit: spec.limit,
394 offset: spec.offset,
395 })
396 } else {
397 None
398 }
399 }
400 QueryMode::Delete(_) => None,
401 },
402 consistency: self.consistency,
403 };
404 let plan = AccessPlannedQuery::from_parts(logical, access_plan_value);
405
406 validate_query_semantics(&schema_info, self.model, &plan)?;
407
408 Ok(plan)
409 }
410
411 fn validate_intent(&self) -> Result<(), IntentError> {
413 if self.key_access_conflict {
414 return Err(IntentError::KeyAccessConflict);
415 }
416
417 policy::validate_intent_plan_shape(self.mode, self.order.as_ref())
418 .map_err(IntentError::from)?;
419
420 if let Some(state) = &self.key_access {
421 match state.kind {
422 KeyAccessKind::Many if self.predicate.is_some() => {
423 return Err(IntentError::ByIdsWithPredicate);
424 }
425 KeyAccessKind::Only if self.predicate.is_some() => {
426 return Err(IntentError::OnlyWithPredicate);
427 }
428 _ => {
429 }
431 }
432 }
433
434 Ok(())
435 }
436}
437
438#[derive(Debug)]
456pub struct PlannedQuery<E: EntityKind> {
457 plan: AccessPlannedQuery<E::Key>,
458 _marker: PhantomData<E>,
459}
460
461impl<E: EntityKind> PlannedQuery<E> {
462 #[must_use]
463 pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
464 Self {
465 plan,
466 _marker: PhantomData,
467 }
468 }
469
470 #[must_use]
471 pub fn explain(&self) -> ExplainPlan {
472 self.plan.explain_with_model(E::MODEL)
473 }
474
475 #[must_use]
476 pub(in crate::db) fn into_inner(self) -> AccessPlannedQuery<E::Key> {
477 self.plan
478 }
479}
480
481impl<E: EntityKind> From<PlannedQuery<E>> for ExecutablePlan<E> {
482 fn from(value: PlannedQuery<E>) -> Self {
483 Self::new(value.into_inner())
484 }
485}
486
487#[derive(Debug)]
488pub struct Query<E: EntityKind> {
489 intent: QueryModel<'static, E::Key>,
490 _marker: PhantomData<E>,
491}
492
493impl<E: EntityKind> Query<E> {
494 #[must_use]
498 pub const fn new(consistency: ReadConsistency) -> Self {
499 Self {
500 intent: QueryModel::new(E::MODEL, consistency),
501 _marker: PhantomData,
502 }
503 }
504
505 #[must_use]
507 pub const fn mode(&self) -> QueryMode {
508 self.intent.mode()
509 }
510
511 #[must_use]
512 pub(crate) fn has_explicit_order(&self) -> bool {
513 self.intent.has_explicit_order()
514 }
515
516 #[must_use]
517 pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
518 self.intent.load_spec()
519 }
520
521 #[must_use]
523 pub fn filter(mut self, predicate: Predicate) -> Self {
524 self.intent = self.intent.filter(predicate);
525 self
526 }
527
528 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
530 let Self { intent, _marker } = self;
531 let intent = intent.filter_expr(expr)?;
532
533 Ok(Self { intent, _marker })
534 }
535
536 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
538 let Self { intent, _marker } = self;
539 let intent = intent.sort_expr(expr)?;
540
541 Ok(Self { intent, _marker })
542 }
543
544 #[must_use]
546 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
547 self.intent = self.intent.order_by(field);
548 self
549 }
550
551 #[must_use]
553 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
554 self.intent = self.intent.order_by_desc(field);
555 self
556 }
557
558 #[must_use]
560 pub fn distinct(mut self) -> Self {
561 self.intent = self.intent.distinct();
562 self
563 }
564
565 pub(crate) fn by_id(self, id: E::Key) -> Self {
567 let Self { intent, _marker } = self;
568 Self {
569 intent: intent.by_id(id),
570 _marker,
571 }
572 }
573
574 pub(crate) fn by_ids<I>(self, ids: I) -> Self
576 where
577 I: IntoIterator<Item = E::Key>,
578 {
579 let Self { intent, _marker } = self;
580 Self {
581 intent: intent.by_ids(ids),
582 _marker,
583 }
584 }
585
586 #[must_use]
588 pub fn delete(mut self) -> Self {
589 self.intent = self.intent.delete();
590 self
591 }
592
593 #[must_use]
599 pub fn limit(mut self, limit: u32) -> Self {
600 self.intent = self.intent.limit(limit);
601 self
602 }
603
604 #[must_use]
608 pub fn offset(mut self, offset: u32) -> Self {
609 self.intent = self.intent.offset(offset);
610 self
611 }
612
613 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
615 let plan = self.planned()?;
616
617 Ok(plan.explain())
618 }
619
620 pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
622 let plan = self.build_plan()?;
623
624 Ok(PlannedQuery::new(plan))
625 }
626
627 pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
629 self.planned().map(ExecutablePlan::from)
630 }
631
632 fn build_plan(&self) -> Result<AccessPlannedQuery<E::Key>, QueryError> {
634 let plan_value = self.intent.build_plan_model()?;
635 let (logical, access) = plan_value.into_parts();
636 let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
637 let plan = AccessPlannedQuery::from_parts(logical, access);
638
639 Ok(plan)
640 }
641}
642
643impl<E> Query<E>
644where
645 E: EntityKind + SingletonEntity,
646 E::Key: Default,
647{
648 pub(crate) fn only(self) -> Self {
650 let Self { intent, _marker } = self;
651
652 Self {
653 intent: intent.only(E::Key::default()),
654 _marker,
655 }
656 }
657}
658
659#[derive(Debug, ThisError)]
664pub enum QueryError {
665 #[error("{0}")]
666 Validate(#[from] ValidateError),
667
668 #[error("{0}")]
669 Plan(Box<PlanError>),
670
671 #[error("{0}")]
672 Intent(#[from] IntentError),
673
674 #[error("{0}")]
675 Response(#[from] ResponseError),
676
677 #[error("{0}")]
678 Execute(#[from] InternalError),
679}
680
681impl From<PlannerError> for QueryError {
682 fn from(err: PlannerError) -> Self {
683 match err {
684 PlannerError::Plan(err) => Self::from(*err),
685 PlannerError::Internal(err) => Self::Execute(*err),
686 }
687 }
688}
689
690impl From<PlanError> for QueryError {
691 fn from(err: PlanError) -> Self {
692 Self::Plan(Box::new(err))
693 }
694}
695
696#[derive(Clone, Copy, Debug, ThisError)]
701pub enum IntentError {
702 #[error("{0}")]
703 PlanShape(#[from] policy::PlanPolicyError),
704
705 #[error("by_ids() cannot be combined with predicates")]
706 ByIdsWithPredicate,
707
708 #[error("only() cannot be combined with predicates")]
709 OnlyWithPredicate,
710
711 #[error("multiple key access methods were used on the same query")]
712 KeyAccessConflict,
713
714 #[error("cursor pagination requires an explicit ordering")]
715 CursorRequiresOrder,
716
717 #[error("cursor pagination requires an explicit limit")]
718 CursorRequiresLimit,
719
720 #[error("cursor tokens can only be used with .page().execute()")]
721 CursorRequiresPagedExecution,
722}
723
724impl From<policy::CursorPagingPolicyError> for IntentError {
725 fn from(err: policy::CursorPagingPolicyError) -> Self {
726 match err {
727 policy::CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
728 policy::CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
729 }
730 }
731}
732
733fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
735 match order {
736 Some(mut spec) => {
737 spec.fields.push((field.to_string(), direction));
738 spec
739 }
740 None => OrderSpec {
741 fields: vec![(field.to_string(), direction)],
742 },
743 }
744}
745
746fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
751 let mut order = order?;
752 if order.fields.is_empty() {
753 return Some(order);
754 }
755
756 let pk_field = model.primary_key.name;
757 let mut pk_direction = None;
758 order.fields.retain(|(field, direction)| {
759 if field == pk_field {
760 if pk_direction.is_none() {
761 pk_direction = Some(*direction);
762 }
763 false
764 } else {
765 true
766 }
767 });
768
769 let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
770 order.fields.push((pk_field.to_string(), pk_direction));
771
772 Some(order)
773}