1#![expect(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5pub type DeleteSpec = crate::db::query::plan::DeleteSpec;
7pub type LoadSpec = crate::db::query::plan::LoadSpec;
8pub type QueryMode = crate::db::query::plan::QueryMode;
9
10use crate::{
11 db::{
12 access::{AccessPath, AccessPlan, AccessPlanError},
13 contracts::{Predicate, ReadConsistency, SchemaInfo, ValidateError},
14 policy,
15 query::{
16 explain::ExplainPlan,
17 expr::{FilterExpr, SortExpr, SortLowerError},
18 plan::validate::validate_query_semantics,
19 plan::{
20 AccessPlannedQuery, DeleteLimitSpec, LogicalPlan, OrderDirection, OrderSpec,
21 PageSpec, PlanError, PlannerError, canonical, plan_access,
22 },
23 predicate::{
24 lower_to_execution_model, normalize, normalize_enum_literals,
25 validate::reject_unsupported_query_features,
26 },
27 },
28 response::ResponseError,
29 },
30 error::InternalError,
31 model::entity::EntityModel,
32 traits::{EntityKind, FieldValue, SingletonEntity},
33 value::Value,
34};
35use std::marker::PhantomData;
36use thiserror::Error as ThisError;
37
38#[derive(Clone, Debug, Eq, PartialEq)]
44pub(crate) enum KeyAccess<K> {
45 Single(K),
46 Many(Vec<K>),
47}
48
49#[derive(Clone, Copy, Debug, Eq, PartialEq)]
55pub(crate) enum KeyAccessKind {
56 Single,
57 Many,
58 Only,
59}
60
61#[derive(Clone, Debug, Eq, PartialEq)]
67pub(crate) struct KeyAccessState<K> {
68 pub kind: KeyAccessKind,
69 pub access: KeyAccess<K>,
70}
71
72pub(crate) fn access_plan_from_keys_value<K>(access: &KeyAccess<K>) -> AccessPlan<Value>
74where
75 K: FieldValue,
76{
77 match access {
78 KeyAccess::Single(key) => AccessPlan::path(AccessPath::ByKey(key.to_value())),
79 KeyAccess::Many(keys) => {
80 let mut values: Vec<Value> = keys.iter().map(FieldValue::to_value).collect();
81 canonical::canonicalize_key_values(&mut values);
82 if let Some(first) = values.first()
83 && values.len() == 1
84 {
85 return AccessPlan::path(AccessPath::ByKey(first.clone()));
86 }
87
88 AccessPlan::path(AccessPath::ByKeys(values))
89 }
90 }
91}
92
93pub(crate) fn access_plan_to_entity_keys<E: EntityKind>(
95 model: &EntityModel,
96 access: AccessPlan<Value>,
97) -> Result<AccessPlan<E::Key>, PlanError> {
98 access.into_executable::<E>(model)
99}
100
101pub(crate) fn coerce_entity_key<E: EntityKind>(
103 model: &EntityModel,
104 key: &Value,
105) -> Result<E::Key, PlanError> {
106 E::Key::from_value(key).ok_or_else(|| {
107 PlanError::from(AccessPlanError::PrimaryKeyMismatch {
108 field: model.primary_key.name.to_string(),
109 key: key.clone(),
110 })
111 })
112}
113
114impl AccessPlan<Value> {
115 pub(crate) fn into_executable<E: EntityKind>(
117 self,
118 model: &EntityModel,
119 ) -> Result<AccessPlan<E::Key>, PlanError> {
120 match self {
121 Self::Path(path) => Ok(AccessPlan::path(path.into_executable::<E>(model)?)),
122 Self::Union(children) => {
123 let mut out = Vec::with_capacity(children.len());
124 for child in children {
125 out.push(child.into_executable::<E>(model)?);
126 }
127
128 Ok(AccessPlan::Union(out))
129 }
130 Self::Intersection(children) => {
131 let mut out = Vec::with_capacity(children.len());
132 for child in children {
133 out.push(child.into_executable::<E>(model)?);
134 }
135
136 Ok(AccessPlan::Intersection(out))
137 }
138 }
139 }
140}
141
142impl AccessPath<Value> {
143 pub(crate) fn into_executable<E: EntityKind>(
145 self,
146 model: &EntityModel,
147 ) -> Result<AccessPath<E::Key>, PlanError> {
148 match self {
149 Self::ByKey(key) => Ok(AccessPath::ByKey(coerce_entity_key::<E>(model, &key)?)),
150 Self::ByKeys(keys) => {
151 let mut out = Vec::with_capacity(keys.len());
152 for key in keys {
153 out.push(coerce_entity_key::<E>(model, &key)?);
154 }
155
156 Ok(AccessPath::ByKeys(out))
157 }
158 Self::KeyRange { start, end } => Ok(AccessPath::KeyRange {
159 start: coerce_entity_key::<E>(model, &start)?,
160 end: coerce_entity_key::<E>(model, &end)?,
161 }),
162 Self::IndexPrefix { index, values } => Ok(AccessPath::IndexPrefix { index, values }),
163 Self::IndexRange { spec } => Ok(AccessPath::IndexRange { spec }),
164 Self::FullScan => Ok(AccessPath::FullScan),
165 }
166 }
167}
168
169#[derive(Debug)]
177pub(crate) struct QueryModel<'m, K> {
178 model: &'m EntityModel,
179 mode: QueryMode,
180 predicate: Option<Predicate>,
181 key_access: Option<KeyAccessState<K>>,
182 key_access_conflict: bool,
183 order: Option<OrderSpec>,
184 distinct: bool,
185 consistency: ReadConsistency,
186}
187
188impl<'m, K: FieldValue> QueryModel<'m, K> {
189 #[must_use]
190 pub(crate) const fn new(model: &'m EntityModel, consistency: ReadConsistency) -> Self {
191 Self {
192 model,
193 mode: QueryMode::Load(LoadSpec::new()),
194 predicate: None,
195 key_access: None,
196 key_access_conflict: false,
197 order: None,
198 distinct: false,
199 consistency,
200 }
201 }
202
203 #[must_use]
205 pub(crate) const fn mode(&self) -> QueryMode {
206 self.mode
207 }
208
209 #[must_use]
210 fn has_explicit_order(&self) -> bool {
211 policy::has_explicit_order(self.order.as_ref())
212 }
213
214 #[must_use]
215 const fn load_spec(&self) -> Option<LoadSpec> {
216 match self.mode {
217 QueryMode::Load(spec) => Some(spec),
218 QueryMode::Delete(_) => None,
219 }
220 }
221
222 #[must_use]
224 pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
225 self.predicate = match self.predicate.take() {
226 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
227 None => Some(predicate),
228 };
229 self
230 }
231
232 pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
234 let schema = SchemaInfo::from_entity_model(self.model)?;
235 let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
236
237 Ok(self.filter(predicate))
238 }
239
240 pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
242 let schema = SchemaInfo::from_entity_model(self.model)?;
243 let order = match expr.lower_with(&schema) {
244 Ok(order) => order,
245 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
246 Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
247 };
248
249 policy::validate_order_shape(Some(&order))
250 .map_err(IntentError::from)
251 .map_err(QueryError::from)?;
252
253 Ok(self.order_spec(order))
254 }
255
256 #[must_use]
258 pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
259 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
260 self
261 }
262
263 #[must_use]
265 pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
266 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
267 self
268 }
269
270 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
272 self.order = Some(order);
273 self
274 }
275
276 #[must_use]
278 pub(crate) const fn distinct(mut self) -> Self {
279 self.distinct = true;
280 self
281 }
282
283 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
285 if let Some(existing) = &self.key_access
286 && existing.kind != kind
287 {
288 self.key_access_conflict = true;
289 }
290
291 self.key_access = Some(KeyAccessState { kind, access });
292
293 self
294 }
295
296 pub(crate) fn by_id(self, id: K) -> Self {
298 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
299 }
300
301 pub(crate) fn by_ids<I>(self, ids: I) -> Self
303 where
304 I: IntoIterator<Item = K>,
305 {
306 self.set_key_access(
307 KeyAccessKind::Many,
308 KeyAccess::Many(ids.into_iter().collect()),
309 )
310 }
311
312 pub(crate) fn only(self, id: K) -> Self {
314 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
315 }
316
317 #[must_use]
319 pub(crate) const fn delete(mut self) -> Self {
320 if self.mode.is_load() {
321 self.mode = QueryMode::Delete(DeleteSpec::new());
322 }
323 self
324 }
325
326 #[must_use]
330 pub(crate) const fn limit(mut self, limit: u32) -> Self {
331 match self.mode {
332 QueryMode::Load(mut spec) => {
333 spec.limit = Some(limit);
334 self.mode = QueryMode::Load(spec);
335 }
336 QueryMode::Delete(mut spec) => {
337 spec.limit = Some(limit);
338 self.mode = QueryMode::Delete(spec);
339 }
340 }
341 self
342 }
343
344 #[must_use]
346 pub(crate) const fn offset(mut self, offset: u32) -> Self {
347 if let QueryMode::Load(mut spec) = self.mode {
348 spec.offset = offset;
349 self.mode = QueryMode::Load(spec);
350 }
351 self
352 }
353
354 fn build_plan_model(&self) -> Result<AccessPlannedQuery<Value>, QueryError> {
356 let schema_info = SchemaInfo::from_entity_model(self.model)?;
358 self.validate_intent()?;
359
360 let normalized_predicate = self
362 .predicate
363 .as_ref()
364 .map(|predicate| {
365 reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
366 let predicate = normalize_enum_literals(&schema_info, predicate)?;
367 Ok::<Predicate, ValidateError>(normalize(&predicate))
368 })
369 .transpose()?;
370 let access_plan_value = match &self.key_access {
371 Some(state) => access_plan_from_keys_value(&state.access),
372 None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
373 };
374
375 let logical = LogicalPlan {
377 mode: self.mode,
378 predicate: normalized_predicate.map(lower_to_execution_model),
379 order: canonicalize_order_spec(self.model, self.order.clone()),
382 distinct: self.distinct,
383 delete_limit: match self.mode {
384 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
385 QueryMode::Load(_) => None,
386 },
387 page: match self.mode {
388 QueryMode::Load(spec) => {
389 if spec.limit.is_some() || spec.offset > 0 {
390 Some(PageSpec {
391 limit: spec.limit,
392 offset: spec.offset,
393 })
394 } else {
395 None
396 }
397 }
398 QueryMode::Delete(_) => None,
399 },
400 consistency: self.consistency,
401 };
402 let plan = AccessPlannedQuery::from_parts(logical, access_plan_value);
403
404 validate_query_semantics(&schema_info, self.model, &plan)?;
405
406 Ok(plan)
407 }
408
409 fn validate_intent(&self) -> Result<(), IntentError> {
411 if self.key_access_conflict {
412 return Err(IntentError::KeyAccessConflict);
413 }
414
415 policy::validate_intent_plan_shape(self.mode, self.order.as_ref())
416 .map_err(IntentError::from)?;
417
418 if let Some(state) = &self.key_access {
419 match state.kind {
420 KeyAccessKind::Many if self.predicate.is_some() => {
421 return Err(IntentError::ByIdsWithPredicate);
422 }
423 KeyAccessKind::Only if self.predicate.is_some() => {
424 return Err(IntentError::OnlyWithPredicate);
425 }
426 _ => {
427 }
429 }
430 }
431
432 Ok(())
433 }
434}
435
436#[derive(Debug)]
454pub struct PlannedQuery<E: EntityKind> {
455 plan: AccessPlannedQuery<E::Key>,
456 _marker: PhantomData<E>,
457}
458
459impl<E: EntityKind> PlannedQuery<E> {
460 #[must_use]
461 pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
462 Self {
463 plan,
464 _marker: PhantomData,
465 }
466 }
467
468 #[must_use]
469 pub fn explain(&self) -> ExplainPlan {
470 self.plan.explain_with_model(E::MODEL)
471 }
472
473 #[must_use]
474 pub(in crate::db) fn into_inner(self) -> AccessPlannedQuery<E::Key> {
475 self.plan
476 }
477}
478
479#[derive(Debug)]
480pub struct Query<E: EntityKind> {
481 intent: QueryModel<'static, E::Key>,
482 _marker: PhantomData<E>,
483}
484
485impl<E: EntityKind> Query<E> {
486 #[must_use]
490 pub const fn new(consistency: ReadConsistency) -> Self {
491 Self {
492 intent: QueryModel::new(E::MODEL, consistency),
493 _marker: PhantomData,
494 }
495 }
496
497 #[must_use]
499 pub const fn mode(&self) -> QueryMode {
500 self.intent.mode()
501 }
502
503 #[must_use]
504 pub(crate) fn has_explicit_order(&self) -> bool {
505 self.intent.has_explicit_order()
506 }
507
508 #[must_use]
509 pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
510 self.intent.load_spec()
511 }
512
513 #[must_use]
515 pub fn filter(mut self, predicate: Predicate) -> Self {
516 self.intent = self.intent.filter(predicate);
517 self
518 }
519
520 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
522 let Self { intent, _marker } = self;
523 let intent = intent.filter_expr(expr)?;
524
525 Ok(Self { intent, _marker })
526 }
527
528 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
530 let Self { intent, _marker } = self;
531 let intent = intent.sort_expr(expr)?;
532
533 Ok(Self { intent, _marker })
534 }
535
536 #[must_use]
538 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
539 self.intent = self.intent.order_by(field);
540 self
541 }
542
543 #[must_use]
545 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
546 self.intent = self.intent.order_by_desc(field);
547 self
548 }
549
550 #[must_use]
552 pub fn distinct(mut self) -> Self {
553 self.intent = self.intent.distinct();
554 self
555 }
556
557 pub(crate) fn by_id(self, id: E::Key) -> Self {
559 let Self { intent, _marker } = self;
560 Self {
561 intent: intent.by_id(id),
562 _marker,
563 }
564 }
565
566 pub(crate) fn by_ids<I>(self, ids: I) -> Self
568 where
569 I: IntoIterator<Item = E::Key>,
570 {
571 let Self { intent, _marker } = self;
572 Self {
573 intent: intent.by_ids(ids),
574 _marker,
575 }
576 }
577
578 #[must_use]
580 pub fn delete(mut self) -> Self {
581 self.intent = self.intent.delete();
582 self
583 }
584
585 #[must_use]
591 pub fn limit(mut self, limit: u32) -> Self {
592 self.intent = self.intent.limit(limit);
593 self
594 }
595
596 #[must_use]
600 pub fn offset(mut self, offset: u32) -> Self {
601 self.intent = self.intent.offset(offset);
602 self
603 }
604
605 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
607 let plan = self.planned()?;
608
609 Ok(plan.explain())
610 }
611
612 pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
614 let plan = self.build_plan()?;
615
616 Ok(PlannedQuery::new(plan))
617 }
618
619 fn build_plan(&self) -> Result<AccessPlannedQuery<E::Key>, QueryError> {
621 let plan_value = self.intent.build_plan_model()?;
622 let (logical, access) = plan_value.into_parts();
623 let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
624 let plan = AccessPlannedQuery::from_parts(logical, access);
625
626 Ok(plan)
627 }
628}
629
630impl<E> Query<E>
631where
632 E: EntityKind + SingletonEntity,
633 E::Key: Default,
634{
635 pub(crate) fn only(self) -> Self {
637 let Self { intent, _marker } = self;
638
639 Self {
640 intent: intent.only(E::Key::default()),
641 _marker,
642 }
643 }
644}
645
646#[derive(Debug, ThisError)]
651pub enum QueryError {
652 #[error("{0}")]
653 Validate(#[from] ValidateError),
654
655 #[error("{0}")]
656 Plan(Box<PlanError>),
657
658 #[error("{0}")]
659 Intent(#[from] IntentError),
660
661 #[error("{0}")]
662 Response(#[from] ResponseError),
663
664 #[error("{0}")]
665 Execute(#[from] InternalError),
666}
667
668impl From<PlannerError> for QueryError {
669 fn from(err: PlannerError) -> Self {
670 match err {
671 PlannerError::Plan(err) => Self::from(*err),
672 PlannerError::Internal(err) => Self::Execute(*err),
673 }
674 }
675}
676
677impl From<PlanError> for QueryError {
678 fn from(err: PlanError) -> Self {
679 Self::Plan(Box::new(err))
680 }
681}
682
683#[derive(Clone, Copy, Debug, ThisError)]
688pub enum IntentError {
689 #[error("{0}")]
690 PlanShape(#[from] policy::PlanPolicyError),
691
692 #[error("by_ids() cannot be combined with predicates")]
693 ByIdsWithPredicate,
694
695 #[error("only() cannot be combined with predicates")]
696 OnlyWithPredicate,
697
698 #[error("multiple key access methods were used on the same query")]
699 KeyAccessConflict,
700
701 #[error("cursor pagination requires an explicit ordering")]
702 CursorRequiresOrder,
703
704 #[error("cursor pagination requires an explicit limit")]
705 CursorRequiresLimit,
706
707 #[error("cursor tokens can only be used with .page().execute()")]
708 CursorRequiresPagedExecution,
709}
710
711impl From<policy::CursorPagingPolicyError> for IntentError {
712 fn from(err: policy::CursorPagingPolicyError) -> Self {
713 match err {
714 policy::CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
715 policy::CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
716 }
717 }
718}
719
720fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
722 match order {
723 Some(mut spec) => {
724 spec.fields.push((field.to_string(), direction));
725 spec
726 }
727 None => OrderSpec {
728 fields: vec![(field.to_string(), direction)],
729 },
730 }
731}
732
733fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
738 let mut order = order?;
739 if order.fields.is_empty() {
740 return Some(order);
741 }
742
743 let pk_field = model.primary_key.name;
744 let mut pk_direction = None;
745 order.fields.retain(|(field, direction)| {
746 if field == pk_field {
747 if pk_direction.is_none() {
748 pk_direction = Some(*direction);
749 }
750 false
751 } else {
752 true
753 }
754 });
755
756 let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
757 order.fields.push((pk_field.to_string(), pk_direction));
758
759 Some(order)
760}