1#![expect(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5mod key_access;
7pub(crate) use key_access::*;
8
9use crate::{
10 db::{
11 contracts::ReadConsistency,
12 executor::ExecutablePlan,
13 policy,
14 query::{
15 explain::ExplainPlan,
16 expr::{FilterExpr, SortExpr, SortLowerError},
17 plan::{
18 AccessPlannedQuery, DeleteLimitSpec, LogicalPlan, OrderDirection, OrderSpec,
19 PageSpec, PlanError,
20 planner::{PlannerError, plan_access},
21 validate::validate_logical_plan_model,
22 },
23 predicate::{
24 Predicate, SchemaInfo, ValidateError, normalize, normalize_enum_literals,
25 validate::reject_unsupported_query_features,
26 },
27 },
28 response::ResponseError,
29 },
30 error::InternalError,
31 model::entity::EntityModel,
32 traits::{EntityKind, FieldValue, SingletonEntity},
33 value::Value,
34};
35use std::marker::PhantomData;
36use thiserror::Error as ThisError;
37
38#[derive(Clone, Copy, Debug, Eq, PartialEq)]
47pub enum QueryMode {
48 Load(LoadSpec),
49 Delete(DeleteSpec),
50}
51
52impl QueryMode {
53 #[must_use]
55 pub const fn is_load(&self) -> bool {
56 match self {
57 Self::Load(_) => true,
58 Self::Delete(_) => false,
59 }
60 }
61
62 #[must_use]
64 pub const fn is_delete(&self) -> bool {
65 match self {
66 Self::Delete(_) => true,
67 Self::Load(_) => false,
68 }
69 }
70}
71
72#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
80pub struct LoadSpec {
81 pub limit: Option<u32>,
82 pub offset: u32,
83}
84
85impl LoadSpec {
86 #[must_use]
88 pub const fn new() -> Self {
89 Self {
90 limit: None,
91 offset: 0,
92 }
93 }
94}
95
96#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
104pub struct DeleteSpec {
105 pub limit: Option<u32>,
106}
107
108impl DeleteSpec {
109 #[must_use]
111 pub const fn new() -> Self {
112 Self { limit: None }
113 }
114}
115
116#[derive(Debug)]
124pub(crate) struct QueryModel<'m, K> {
125 model: &'m EntityModel,
126 mode: QueryMode,
127 predicate: Option<Predicate>,
128 key_access: Option<KeyAccessState<K>>,
129 key_access_conflict: bool,
130 order: Option<OrderSpec>,
131 distinct: bool,
132 consistency: ReadConsistency,
133}
134
135impl<'m, K: FieldValue> QueryModel<'m, K> {
136 #[must_use]
137 pub(crate) const fn new(model: &'m EntityModel, consistency: ReadConsistency) -> Self {
138 Self {
139 model,
140 mode: QueryMode::Load(LoadSpec::new()),
141 predicate: None,
142 key_access: None,
143 key_access_conflict: false,
144 order: None,
145 distinct: false,
146 consistency,
147 }
148 }
149
150 #[must_use]
152 pub(crate) const fn mode(&self) -> QueryMode {
153 self.mode
154 }
155
156 #[must_use]
157 fn has_explicit_order(&self) -> bool {
158 policy::has_explicit_order(self.order.as_ref())
159 }
160
161 #[must_use]
162 const fn load_spec(&self) -> Option<LoadSpec> {
163 match self.mode {
164 QueryMode::Load(spec) => Some(spec),
165 QueryMode::Delete(_) => None,
166 }
167 }
168
169 #[must_use]
171 pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
172 self.predicate = match self.predicate.take() {
173 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
174 None => Some(predicate),
175 };
176 self
177 }
178
179 pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
181 let schema = SchemaInfo::from_entity_model(self.model)?;
182 let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
183
184 Ok(self.filter(predicate))
185 }
186
187 pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
189 let schema = SchemaInfo::from_entity_model(self.model)?;
190 let order = match expr.lower_with(&schema) {
191 Ok(order) => order,
192 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
193 Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
194 };
195
196 policy::validate_order_shape(Some(&order))
197 .map_err(IntentError::from)
198 .map_err(QueryError::from)?;
199
200 Ok(self.order_spec(order))
201 }
202
203 #[must_use]
205 pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
206 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
207 self
208 }
209
210 #[must_use]
212 pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
213 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
214 self
215 }
216
217 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
219 self.order = Some(order);
220 self
221 }
222
223 #[must_use]
225 pub(crate) const fn distinct(mut self) -> Self {
226 self.distinct = true;
227 self
228 }
229
230 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
232 if let Some(existing) = &self.key_access
233 && existing.kind != kind
234 {
235 self.key_access_conflict = true;
236 }
237
238 self.key_access = Some(KeyAccessState { kind, access });
239
240 self
241 }
242
243 pub(crate) fn by_id(self, id: K) -> Self {
245 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
246 }
247
248 pub(crate) fn by_ids<I>(self, ids: I) -> Self
250 where
251 I: IntoIterator<Item = K>,
252 {
253 self.set_key_access(
254 KeyAccessKind::Many,
255 KeyAccess::Many(ids.into_iter().collect()),
256 )
257 }
258
259 pub(crate) fn only(self, id: K) -> Self {
261 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
262 }
263
264 #[must_use]
266 pub(crate) const fn delete(mut self) -> Self {
267 if self.mode.is_load() {
268 self.mode = QueryMode::Delete(DeleteSpec::new());
269 }
270 self
271 }
272
273 #[must_use]
277 pub(crate) const fn limit(mut self, limit: u32) -> Self {
278 match self.mode {
279 QueryMode::Load(mut spec) => {
280 spec.limit = Some(limit);
281 self.mode = QueryMode::Load(spec);
282 }
283 QueryMode::Delete(mut spec) => {
284 spec.limit = Some(limit);
285 self.mode = QueryMode::Delete(spec);
286 }
287 }
288 self
289 }
290
291 #[must_use]
293 pub(crate) const fn offset(mut self, offset: u32) -> Self {
294 if let QueryMode::Load(mut spec) = self.mode {
295 spec.offset = offset;
296 self.mode = QueryMode::Load(spec);
297 }
298 self
299 }
300
301 fn build_plan_model(&self) -> Result<AccessPlannedQuery<Value>, QueryError> {
303 let schema_info = SchemaInfo::from_entity_model(self.model)?;
305 self.validate_intent()?;
306
307 let normalized_predicate = self
309 .predicate
310 .as_ref()
311 .map(|predicate| {
312 reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
313 let predicate = normalize_enum_literals(&schema_info, predicate)?;
314 Ok::<Predicate, ValidateError>(normalize(&predicate))
315 })
316 .transpose()?;
317 let access_plan_value = match &self.key_access {
318 Some(state) => access_plan_from_keys_value(&state.access),
319 None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
320 };
321
322 let logical = LogicalPlan {
324 mode: self.mode,
325 predicate: normalized_predicate,
326 order: canonicalize_order_spec(self.model, self.order.clone()),
329 distinct: self.distinct,
330 delete_limit: match self.mode {
331 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
332 QueryMode::Load(_) => None,
333 },
334 page: match self.mode {
335 QueryMode::Load(spec) => {
336 if spec.limit.is_some() || spec.offset > 0 {
337 Some(PageSpec {
338 limit: spec.limit,
339 offset: spec.offset,
340 })
341 } else {
342 None
343 }
344 }
345 QueryMode::Delete(_) => None,
346 },
347 consistency: self.consistency,
348 };
349 let plan = AccessPlannedQuery::from_parts(logical, access_plan_value);
350
351 validate_logical_plan_model(&schema_info, self.model, &plan)?;
352
353 Ok(plan)
354 }
355
356 fn validate_intent(&self) -> Result<(), IntentError> {
358 if self.key_access_conflict {
359 return Err(IntentError::KeyAccessConflict);
360 }
361
362 policy::validate_intent_plan_shape(self.mode, self.order.as_ref())
363 .map_err(IntentError::from)?;
364
365 if let Some(state) = &self.key_access {
366 match state.kind {
367 KeyAccessKind::Many if self.predicate.is_some() => {
368 return Err(IntentError::ByIdsWithPredicate);
369 }
370 KeyAccessKind::Only if self.predicate.is_some() => {
371 return Err(IntentError::OnlyWithPredicate);
372 }
373 _ => {
374 }
376 }
377 }
378
379 Ok(())
380 }
381}
382
383#[derive(Debug)]
401pub struct PlannedQuery<E: EntityKind> {
402 plan: AccessPlannedQuery<E::Key>,
403 _marker: PhantomData<E>,
404}
405
406impl<E: EntityKind> PlannedQuery<E> {
407 #[must_use]
408 pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
409 Self {
410 plan,
411 _marker: PhantomData,
412 }
413 }
414
415 #[must_use]
416 pub fn explain(&self) -> ExplainPlan {
417 self.plan.explain_with_model(E::MODEL)
418 }
419
420 #[must_use]
421 pub(in crate::db) fn into_inner(self) -> AccessPlannedQuery<E::Key> {
422 self.plan
423 }
424}
425
426impl<E: EntityKind> From<PlannedQuery<E>> for ExecutablePlan<E> {
427 fn from(value: PlannedQuery<E>) -> Self {
428 Self::new(value.into_inner())
429 }
430}
431
432#[derive(Debug)]
433pub struct Query<E: EntityKind> {
434 intent: QueryModel<'static, E::Key>,
435 _marker: PhantomData<E>,
436}
437
438impl<E: EntityKind> Query<E> {
439 #[must_use]
443 pub const fn new(consistency: ReadConsistency) -> Self {
444 Self {
445 intent: QueryModel::new(E::MODEL, consistency),
446 _marker: PhantomData,
447 }
448 }
449
450 #[must_use]
452 pub const fn mode(&self) -> QueryMode {
453 self.intent.mode()
454 }
455
456 #[must_use]
457 pub(crate) fn has_explicit_order(&self) -> bool {
458 self.intent.has_explicit_order()
459 }
460
461 #[must_use]
462 pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
463 self.intent.load_spec()
464 }
465
466 #[must_use]
468 pub fn filter(mut self, predicate: Predicate) -> Self {
469 self.intent = self.intent.filter(predicate);
470 self
471 }
472
473 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
475 let Self { intent, _marker } = self;
476 let intent = intent.filter_expr(expr)?;
477
478 Ok(Self { intent, _marker })
479 }
480
481 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
483 let Self { intent, _marker } = self;
484 let intent = intent.sort_expr(expr)?;
485
486 Ok(Self { intent, _marker })
487 }
488
489 #[must_use]
491 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
492 self.intent = self.intent.order_by(field);
493 self
494 }
495
496 #[must_use]
498 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
499 self.intent = self.intent.order_by_desc(field);
500 self
501 }
502
503 #[must_use]
505 pub fn distinct(mut self) -> Self {
506 self.intent = self.intent.distinct();
507 self
508 }
509
510 pub(crate) fn by_id(self, id: E::Key) -> Self {
512 let Self { intent, _marker } = self;
513 Self {
514 intent: intent.by_id(id),
515 _marker,
516 }
517 }
518
519 pub(crate) fn by_ids<I>(self, ids: I) -> Self
521 where
522 I: IntoIterator<Item = E::Key>,
523 {
524 let Self { intent, _marker } = self;
525 Self {
526 intent: intent.by_ids(ids),
527 _marker,
528 }
529 }
530
531 #[must_use]
533 pub fn delete(mut self) -> Self {
534 self.intent = self.intent.delete();
535 self
536 }
537
538 #[must_use]
544 pub fn limit(mut self, limit: u32) -> Self {
545 self.intent = self.intent.limit(limit);
546 self
547 }
548
549 #[must_use]
553 pub fn offset(mut self, offset: u32) -> Self {
554 self.intent = self.intent.offset(offset);
555 self
556 }
557
558 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
560 let plan = self.planned()?;
561
562 Ok(plan.explain())
563 }
564
565 pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
567 let plan = self.build_plan()?;
568
569 Ok(PlannedQuery::new(plan))
570 }
571
572 pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
574 self.planned().map(ExecutablePlan::from)
575 }
576
577 fn build_plan(&self) -> Result<AccessPlannedQuery<E::Key>, QueryError> {
579 let plan_value = self.intent.build_plan_model()?;
580 let (logical, access) = plan_value.into_parts();
581 let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
582 let plan = AccessPlannedQuery::from_parts(logical, access);
583
584 Ok(plan)
585 }
586}
587
588impl<E> Query<E>
589where
590 E: EntityKind + SingletonEntity,
591 E::Key: Default,
592{
593 pub(crate) fn only(self) -> Self {
595 let Self { intent, _marker } = self;
596
597 Self {
598 intent: intent.only(E::Key::default()),
599 _marker,
600 }
601 }
602}
603
604#[derive(Debug, ThisError)]
609pub enum QueryError {
610 #[error("{0}")]
611 Validate(#[from] ValidateError),
612
613 #[error("{0}")]
614 Plan(Box<PlanError>),
615
616 #[error("{0}")]
617 Intent(#[from] IntentError),
618
619 #[error("{0}")]
620 Response(#[from] ResponseError),
621
622 #[error("{0}")]
623 Execute(#[from] InternalError),
624}
625
626impl From<PlannerError> for QueryError {
627 fn from(err: PlannerError) -> Self {
628 match err {
629 PlannerError::Plan(err) => Self::from(*err),
630 PlannerError::Internal(err) => Self::Execute(*err),
631 }
632 }
633}
634
635impl From<PlanError> for QueryError {
636 fn from(err: PlanError) -> Self {
637 Self::Plan(Box::new(err))
638 }
639}
640
641#[derive(Clone, Copy, Debug, ThisError)]
646pub enum IntentError {
647 #[error("{0}")]
648 PlanShape(#[from] policy::PlanPolicyError),
649
650 #[error("by_ids() cannot be combined with predicates")]
651 ByIdsWithPredicate,
652
653 #[error("only() cannot be combined with predicates")]
654 OnlyWithPredicate,
655
656 #[error("multiple key access methods were used on the same query")]
657 KeyAccessConflict,
658
659 #[error("cursor pagination requires an explicit ordering")]
660 CursorRequiresOrder,
661
662 #[error("cursor pagination requires an explicit limit")]
663 CursorRequiresLimit,
664
665 #[error("cursor tokens can only be used with .page().execute()")]
666 CursorRequiresPagedExecution,
667}
668
669impl From<policy::CursorPagingPolicyError> for IntentError {
670 fn from(err: policy::CursorPagingPolicyError) -> Self {
671 match err {
672 policy::CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
673 policy::CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
674 }
675 }
676}
677
678fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
680 match order {
681 Some(mut spec) => {
682 spec.fields.push((field.to_string(), direction));
683 spec
684 }
685 None => OrderSpec {
686 fields: vec![(field.to_string(), direction)],
687 },
688 }
689}
690
691fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
696 let mut order = order?;
697 if order.fields.is_empty() {
698 return Some(order);
699 }
700
701 let pk_field = model.primary_key.name;
702 let mut pk_direction = None;
703 order.fields.retain(|(field, direction)| {
704 if field == pk_field {
705 if pk_direction.is_none() {
706 pk_direction = Some(*direction);
707 }
708 false
709 } else {
710 true
711 }
712 });
713
714 let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
715 order.fields.push((pk_field.to_string(), pk_direction));
716
717 Some(order)
718}