1#![expect(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5mod key_access;
7pub(crate) use key_access::*;
8
9use crate::{
10 db::{
11 query::{
12 ReadConsistency,
13 explain::ExplainPlan,
14 expr::{FilterExpr, SortExpr, SortLowerError},
15 plan::{
16 AccessPlannedQuery, DeleteLimitSpec, LogicalPlan, OrderDirection, OrderSpec,
17 PageSpec, PlanError,
18 planner::{PlannerError, plan_access},
19 validate::validate_logical_plan_model,
20 },
21 policy,
22 predicate::{
23 Predicate, SchemaInfo, ValidateError, normalize, normalize_enum_literals,
24 validate::reject_unsupported_query_features,
25 },
26 },
27 response::ResponseError,
28 },
29 error::InternalError,
30 model::entity::EntityModel,
31 traits::{EntityKind, FieldValue, SingletonEntity},
32 value::Value,
33};
34use std::marker::PhantomData;
35use thiserror::Error as ThisError;
36
37#[derive(Clone, Copy, Debug, Eq, PartialEq)]
45pub enum QueryMode {
46 Load(LoadSpec),
47 Delete(DeleteSpec),
48}
49
50impl QueryMode {
51 #[must_use]
53 pub const fn is_load(&self) -> bool {
54 match self {
55 Self::Load(_) => true,
56 Self::Delete(_) => false,
57 }
58 }
59
60 #[must_use]
62 pub const fn is_delete(&self) -> bool {
63 match self {
64 Self::Delete(_) => true,
65 Self::Load(_) => false,
66 }
67 }
68}
69
70#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
77pub struct LoadSpec {
78 pub limit: Option<u32>,
79 pub offset: u32,
80}
81
82impl LoadSpec {
83 #[must_use]
85 pub const fn new() -> Self {
86 Self {
87 limit: None,
88 offset: 0,
89 }
90 }
91}
92
93#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
100pub struct DeleteSpec {
101 pub limit: Option<u32>,
102}
103
104impl DeleteSpec {
105 #[must_use]
107 pub const fn new() -> Self {
108 Self { limit: None }
109 }
110}
111
112#[derive(Debug)]
120pub(crate) struct QueryModel<'m, K> {
121 model: &'m EntityModel,
122 mode: QueryMode,
123 predicate: Option<Predicate>,
124 key_access: Option<KeyAccessState<K>>,
125 key_access_conflict: bool,
126 order: Option<OrderSpec>,
127 distinct: bool,
128 consistency: ReadConsistency,
129}
130
131impl<'m, K: FieldValue> QueryModel<'m, K> {
132 #[must_use]
133 pub(crate) const fn new(model: &'m EntityModel, consistency: ReadConsistency) -> Self {
134 Self {
135 model,
136 mode: QueryMode::Load(LoadSpec::new()),
137 predicate: None,
138 key_access: None,
139 key_access_conflict: false,
140 order: None,
141 distinct: false,
142 consistency,
143 }
144 }
145
146 #[must_use]
148 pub(crate) const fn mode(&self) -> QueryMode {
149 self.mode
150 }
151
152 #[must_use]
153 fn has_explicit_order(&self) -> bool {
154 policy::has_explicit_order(self.order.as_ref())
155 }
156
157 #[must_use]
158 const fn load_spec(&self) -> Option<LoadSpec> {
159 match self.mode {
160 QueryMode::Load(spec) => Some(spec),
161 QueryMode::Delete(_) => None,
162 }
163 }
164
165 #[must_use]
167 pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
168 self.predicate = match self.predicate.take() {
169 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
170 None => Some(predicate),
171 };
172 self
173 }
174
175 pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
177 let schema = SchemaInfo::from_entity_model(self.model)?;
178 let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
179
180 Ok(self.filter(predicate))
181 }
182
183 pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
185 let schema = SchemaInfo::from_entity_model(self.model)?;
186 let order = match expr.lower_with(&schema) {
187 Ok(order) => order,
188 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
189 Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
190 };
191
192 policy::validate_order_shape(Some(&order))
193 .map_err(IntentError::from)
194 .map_err(QueryError::from)?;
195
196 Ok(self.order_spec(order))
197 }
198
199 #[must_use]
201 pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
202 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
203 self
204 }
205
206 #[must_use]
208 pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
209 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
210 self
211 }
212
213 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
215 self.order = Some(order);
216 self
217 }
218
219 #[must_use]
221 pub(crate) const fn distinct(mut self) -> Self {
222 self.distinct = true;
223 self
224 }
225
226 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
228 if let Some(existing) = &self.key_access
229 && existing.kind != kind
230 {
231 self.key_access_conflict = true;
232 }
233
234 self.key_access = Some(KeyAccessState { kind, access });
235
236 self
237 }
238
239 pub(crate) fn by_id(self, id: K) -> Self {
241 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
242 }
243
244 pub(crate) fn by_ids<I>(self, ids: I) -> Self
246 where
247 I: IntoIterator<Item = K>,
248 {
249 self.set_key_access(
250 KeyAccessKind::Many,
251 KeyAccess::Many(ids.into_iter().collect()),
252 )
253 }
254
255 pub(crate) fn only(self, id: K) -> Self {
257 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
258 }
259
260 #[must_use]
262 pub(crate) const fn delete(mut self) -> Self {
263 if self.mode.is_load() {
264 self.mode = QueryMode::Delete(DeleteSpec::new());
265 }
266 self
267 }
268
269 #[must_use]
273 pub(crate) const fn limit(mut self, limit: u32) -> Self {
274 match self.mode {
275 QueryMode::Load(mut spec) => {
276 spec.limit = Some(limit);
277 self.mode = QueryMode::Load(spec);
278 }
279 QueryMode::Delete(mut spec) => {
280 spec.limit = Some(limit);
281 self.mode = QueryMode::Delete(spec);
282 }
283 }
284 self
285 }
286
287 #[must_use]
289 pub(crate) const fn offset(mut self, offset: u32) -> Self {
290 if let QueryMode::Load(mut spec) = self.mode {
291 spec.offset = offset;
292 self.mode = QueryMode::Load(spec);
293 }
294 self
295 }
296
297 fn build_plan_model(&self) -> Result<AccessPlannedQuery<Value>, QueryError> {
299 let schema_info = SchemaInfo::from_entity_model(self.model)?;
301 self.validate_intent()?;
302
303 let normalized_predicate = self
305 .predicate
306 .as_ref()
307 .map(|predicate| {
308 reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
309 let predicate = normalize_enum_literals(&schema_info, predicate)?;
310 Ok::<Predicate, ValidateError>(normalize(&predicate))
311 })
312 .transpose()?;
313 let access_plan_value = match &self.key_access {
314 Some(state) => access_plan_from_keys_value(&state.access),
315 None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
316 };
317
318 let logical = LogicalPlan {
320 mode: self.mode,
321 predicate: normalized_predicate,
322 order: canonicalize_order_spec(self.model, self.order.clone()),
325 distinct: self.distinct,
326 delete_limit: match self.mode {
327 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
328 QueryMode::Load(_) => None,
329 },
330 page: match self.mode {
331 QueryMode::Load(spec) => {
332 if spec.limit.is_some() || spec.offset > 0 {
333 Some(PageSpec {
334 limit: spec.limit,
335 offset: spec.offset,
336 })
337 } else {
338 None
339 }
340 }
341 QueryMode::Delete(_) => None,
342 },
343 consistency: self.consistency,
344 };
345 let plan = AccessPlannedQuery::from_parts(logical, access_plan_value);
346
347 validate_logical_plan_model(&schema_info, self.model, &plan)?;
348
349 Ok(plan)
350 }
351
352 fn validate_intent(&self) -> Result<(), IntentError> {
354 if self.key_access_conflict {
355 return Err(IntentError::KeyAccessConflict);
356 }
357
358 policy::validate_intent_plan_shape(self.mode, self.order.as_ref())
359 .map_err(IntentError::from)?;
360
361 if let Some(state) = &self.key_access {
362 match state.kind {
363 KeyAccessKind::Many if self.predicate.is_some() => {
364 return Err(IntentError::ByIdsWithPredicate);
365 }
366 KeyAccessKind::Only if self.predicate.is_some() => {
367 return Err(IntentError::OnlyWithPredicate);
368 }
369 _ => {
370 }
372 }
373 }
374
375 Ok(())
376 }
377}
378
379#[derive(Debug)]
397pub struct PlannedQuery<E: EntityKind> {
398 plan: AccessPlannedQuery<E::Key>,
399 _marker: PhantomData<E>,
400}
401
402impl<E: EntityKind> PlannedQuery<E> {
403 #[must_use]
404 pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
405 Self {
406 plan,
407 _marker: PhantomData,
408 }
409 }
410
411 #[must_use]
412 pub fn explain(&self) -> ExplainPlan {
413 self.plan.explain_with_model(E::MODEL)
414 }
415
416 #[must_use]
417 pub(in crate::db) fn into_inner(self) -> AccessPlannedQuery<E::Key> {
418 self.plan
419 }
420}
421
422#[derive(Debug)]
423pub struct Query<E: EntityKind> {
424 intent: QueryModel<'static, E::Key>,
425 _marker: PhantomData<E>,
426}
427
428impl<E: EntityKind> Query<E> {
429 #[must_use]
433 pub const fn new(consistency: ReadConsistency) -> Self {
434 Self {
435 intent: QueryModel::new(E::MODEL, consistency),
436 _marker: PhantomData,
437 }
438 }
439
440 #[must_use]
442 pub const fn mode(&self) -> QueryMode {
443 self.intent.mode()
444 }
445
446 #[must_use]
447 pub(crate) fn has_explicit_order(&self) -> bool {
448 self.intent.has_explicit_order()
449 }
450
451 #[must_use]
452 pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
453 self.intent.load_spec()
454 }
455
456 #[must_use]
458 pub fn filter(mut self, predicate: Predicate) -> Self {
459 self.intent = self.intent.filter(predicate);
460 self
461 }
462
463 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
465 let Self { intent, _marker } = self;
466 let intent = intent.filter_expr(expr)?;
467
468 Ok(Self { intent, _marker })
469 }
470
471 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
473 let Self { intent, _marker } = self;
474 let intent = intent.sort_expr(expr)?;
475
476 Ok(Self { intent, _marker })
477 }
478
479 #[must_use]
481 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
482 self.intent = self.intent.order_by(field);
483 self
484 }
485
486 #[must_use]
488 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
489 self.intent = self.intent.order_by_desc(field);
490 self
491 }
492
493 #[must_use]
495 pub fn distinct(mut self) -> Self {
496 self.intent = self.intent.distinct();
497 self
498 }
499
500 pub(crate) fn by_id(self, id: E::Key) -> Self {
502 let Self { intent, _marker } = self;
503 Self {
504 intent: intent.by_id(id),
505 _marker,
506 }
507 }
508
509 pub(crate) fn by_ids<I>(self, ids: I) -> Self
511 where
512 I: IntoIterator<Item = E::Key>,
513 {
514 let Self { intent, _marker } = self;
515 Self {
516 intent: intent.by_ids(ids),
517 _marker,
518 }
519 }
520
521 #[must_use]
523 pub fn delete(mut self) -> Self {
524 self.intent = self.intent.delete();
525 self
526 }
527
528 #[must_use]
534 pub fn limit(mut self, limit: u32) -> Self {
535 self.intent = self.intent.limit(limit);
536 self
537 }
538
539 #[must_use]
543 pub fn offset(mut self, offset: u32) -> Self {
544 self.intent = self.intent.offset(offset);
545 self
546 }
547
548 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
550 let plan = self.planned()?;
551
552 Ok(plan.explain())
553 }
554
555 pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
557 let plan = self.build_plan()?;
558
559 Ok(PlannedQuery::new(plan))
560 }
561
562 fn build_plan(&self) -> Result<AccessPlannedQuery<E::Key>, QueryError> {
564 let plan_value = self.intent.build_plan_model()?;
565 let (logical, access) = plan_value.into_parts();
566 let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
567 let plan = AccessPlannedQuery::from_parts(logical, access);
568
569 Ok(plan)
570 }
571}
572
573impl<E> Query<E>
574where
575 E: EntityKind + SingletonEntity,
576 E::Key: Default,
577{
578 pub(crate) fn only(self) -> Self {
580 let Self { intent, _marker } = self;
581
582 Self {
583 intent: intent.only(E::Key::default()),
584 _marker,
585 }
586 }
587}
588
589#[derive(Debug, ThisError)]
594pub enum QueryError {
595 #[error("{0}")]
596 Validate(#[from] ValidateError),
597
598 #[error("{0}")]
599 Plan(Box<PlanError>),
600
601 #[error("{0}")]
602 Intent(#[from] IntentError),
603
604 #[error("{0}")]
605 Response(#[from] ResponseError),
606
607 #[error("{0}")]
608 Execute(#[from] InternalError),
609}
610
611impl From<PlannerError> for QueryError {
612 fn from(err: PlannerError) -> Self {
613 match err {
614 PlannerError::Plan(err) => Self::from(*err),
615 PlannerError::Internal(err) => Self::Execute(*err),
616 }
617 }
618}
619
620impl From<PlanError> for QueryError {
621 fn from(err: PlanError) -> Self {
622 Self::Plan(Box::new(err))
623 }
624}
625
626#[derive(Clone, Copy, Debug, ThisError)]
631pub enum IntentError {
632 #[error("{0}")]
633 PlanShape(#[from] policy::PlanPolicyError),
634
635 #[error("by_ids() cannot be combined with predicates")]
636 ByIdsWithPredicate,
637
638 #[error("only() cannot be combined with predicates")]
639 OnlyWithPredicate,
640
641 #[error("multiple key access methods were used on the same query")]
642 KeyAccessConflict,
643
644 #[error("cursor pagination requires an explicit ordering")]
645 CursorRequiresOrder,
646
647 #[error("cursor pagination requires an explicit limit")]
648 CursorRequiresLimit,
649
650 #[error("cursor tokens can only be used with .page().execute()")]
651 CursorRequiresPagedExecution,
652}
653
654impl From<policy::CursorPagingPolicyError> for IntentError {
655 fn from(err: policy::CursorPagingPolicyError) -> Self {
656 match err {
657 policy::CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
658 policy::CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
659 }
660 }
661}
662
663fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
665 match order {
666 Some(mut spec) => {
667 spec.fields.push((field.to_string(), direction));
668 spec
669 }
670 None => OrderSpec {
671 fields: vec![(field.to_string(), direction)],
672 },
673 }
674}
675
676fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
681 let mut order = order?;
682 if order.fields.is_empty() {
683 return Some(order);
684 }
685
686 let pk_field = model.primary_key.name;
687 let mut pk_direction = None;
688 order.fields.retain(|(field, direction)| {
689 if field == pk_field {
690 if pk_direction.is_none() {
691 pk_direction = Some(*direction);
692 }
693 false
694 } else {
695 true
696 }
697 });
698
699 let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
700 order.fields.push((pk_field.to_string(), pk_direction));
701
702 Some(order)
703}