1#![expect(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5mod key_access;
7pub(crate) use key_access::*;
8
9use crate::{
10 db::{
11 query::{
12 ReadConsistency,
13 explain::ExplainPlan,
14 expr::{FilterExpr, SortExpr, SortLowerError},
15 plan::{
16 DeleteLimitSpec, ExecutablePlan, LogicalPlan, OrderDirection, OrderSpec, PageSpec,
17 PlanError,
18 planner::{PlannerError, plan_access},
19 validate::validate_logical_plan_model,
20 },
21 policy,
22 predicate::{
23 Predicate, PredicateFieldSlots, SchemaInfo, ValidateError, normalize,
24 normalize_enum_literals, validate::reject_unsupported_query_features,
25 },
26 },
27 response::ResponseError,
28 },
29 error::InternalError,
30 model::entity::EntityModel,
31 traits::{EntityKind, FieldValue, SingletonEntity},
32 value::Value,
33};
34use std::marker::PhantomData;
35use thiserror::Error as ThisError;
36
37#[derive(Clone, Copy, Debug, Eq, PartialEq)]
45pub enum QueryMode {
46 Load(LoadSpec),
47 Delete(DeleteSpec),
48}
49
50impl QueryMode {
51 #[must_use]
53 pub const fn is_load(&self) -> bool {
54 match self {
55 Self::Load(_) => true,
56 Self::Delete(_) => false,
57 }
58 }
59
60 #[must_use]
62 pub const fn is_delete(&self) -> bool {
63 match self {
64 Self::Delete(_) => true,
65 Self::Load(_) => false,
66 }
67 }
68}
69
70#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
77pub struct LoadSpec {
78 pub limit: Option<u32>,
79 pub offset: u32,
80}
81
82impl LoadSpec {
83 #[must_use]
85 pub const fn new() -> Self {
86 Self {
87 limit: None,
88 offset: 0,
89 }
90 }
91}
92
93#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
100pub struct DeleteSpec {
101 pub limit: Option<u32>,
102}
103
104impl DeleteSpec {
105 #[must_use]
107 pub const fn new() -> Self {
108 Self { limit: None }
109 }
110}
111
112#[derive(Debug)]
120pub(crate) struct QueryModel<'m, K> {
121 model: &'m EntityModel,
122 mode: QueryMode,
123 predicate: Option<Predicate>,
124 key_access: Option<KeyAccessState<K>>,
125 key_access_conflict: bool,
126 order: Option<OrderSpec>,
127 distinct: bool,
128 consistency: ReadConsistency,
129}
130
131impl<'m, K: FieldValue> QueryModel<'m, K> {
132 #[must_use]
133 pub(crate) const fn new(model: &'m EntityModel, consistency: ReadConsistency) -> Self {
134 Self {
135 model,
136 mode: QueryMode::Load(LoadSpec::new()),
137 predicate: None,
138 key_access: None,
139 key_access_conflict: false,
140 order: None,
141 distinct: false,
142 consistency,
143 }
144 }
145
146 #[must_use]
148 pub(crate) const fn mode(&self) -> QueryMode {
149 self.mode
150 }
151
152 #[must_use]
153 fn has_explicit_order(&self) -> bool {
154 policy::has_explicit_order(self.order.as_ref())
155 }
156
157 #[must_use]
158 const fn load_spec(&self) -> Option<LoadSpec> {
159 match self.mode {
160 QueryMode::Load(spec) => Some(spec),
161 QueryMode::Delete(_) => None,
162 }
163 }
164
165 #[must_use]
167 pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
168 self.predicate = match self.predicate.take() {
169 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
170 None => Some(predicate),
171 };
172 self
173 }
174
175 pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
177 let schema = SchemaInfo::from_entity_model(self.model)?;
178 let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
179
180 Ok(self.filter(predicate))
181 }
182
183 pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
185 let schema = SchemaInfo::from_entity_model(self.model)?;
186 let order = match expr.lower_with(&schema) {
187 Ok(order) => order,
188 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
189 Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
190 };
191
192 policy::validate_order_shape(Some(&order))
193 .map_err(IntentError::from)
194 .map_err(QueryError::from)?;
195
196 Ok(self.order_spec(order))
197 }
198
199 #[must_use]
201 pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
202 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
203 self
204 }
205
206 #[must_use]
208 pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
209 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
210 self
211 }
212
213 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
215 self.order = Some(order);
216 self
217 }
218
219 #[must_use]
221 pub(crate) const fn distinct(mut self) -> Self {
222 self.distinct = true;
223 self
224 }
225
226 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
228 if let Some(existing) = &self.key_access
229 && existing.kind != kind
230 {
231 self.key_access_conflict = true;
232 }
233
234 self.key_access = Some(KeyAccessState { kind, access });
235
236 self
237 }
238
239 pub(crate) fn by_id(self, id: K) -> Self {
241 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
242 }
243
244 pub(crate) fn by_ids<I>(self, ids: I) -> Self
246 where
247 I: IntoIterator<Item = K>,
248 {
249 self.set_key_access(
250 KeyAccessKind::Many,
251 KeyAccess::Many(ids.into_iter().collect()),
252 )
253 }
254
255 pub(crate) fn only(self, id: K) -> Self {
257 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
258 }
259
260 #[must_use]
262 pub(crate) const fn delete(mut self) -> Self {
263 if self.mode.is_load() {
264 self.mode = QueryMode::Delete(DeleteSpec::new());
265 }
266 self
267 }
268
269 #[must_use]
273 pub(crate) const fn limit(mut self, limit: u32) -> Self {
274 match self.mode {
275 QueryMode::Load(mut spec) => {
276 spec.limit = Some(limit);
277 self.mode = QueryMode::Load(spec);
278 }
279 QueryMode::Delete(mut spec) => {
280 spec.limit = Some(limit);
281 self.mode = QueryMode::Delete(spec);
282 }
283 }
284 self
285 }
286
287 #[must_use]
289 pub(crate) const fn offset(mut self, offset: u32) -> Self {
290 if let QueryMode::Load(mut spec) = self.mode {
291 spec.offset = offset;
292 self.mode = QueryMode::Load(spec);
293 }
294 self
295 }
296
297 fn build_plan_model(&self) -> Result<LogicalPlan<Value>, QueryError> {
299 let schema_info = SchemaInfo::from_entity_model(self.model)?;
301 self.validate_intent()?;
302
303 let normalized_predicate = self
305 .predicate
306 .as_ref()
307 .map(|predicate| {
308 reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
309 let predicate = normalize_enum_literals(&schema_info, predicate)?;
310 Ok::<Predicate, ValidateError>(normalize(&predicate))
311 })
312 .transpose()?;
313 let access_plan_value = match &self.key_access {
314 Some(state) => access_plan_from_keys_value(&state.access),
315 None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
316 };
317
318 let plan = LogicalPlan {
320 mode: self.mode,
321 access: access_plan_value,
322 predicate: normalized_predicate,
323 order: canonicalize_order_spec(self.model, self.order.clone()),
326 distinct: self.distinct,
327 delete_limit: match self.mode {
328 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
329 QueryMode::Load(_) => None,
330 },
331 page: match self.mode {
332 QueryMode::Load(spec) => {
333 if spec.limit.is_some() || spec.offset > 0 {
334 Some(PageSpec {
335 limit: spec.limit,
336 offset: spec.offset,
337 })
338 } else {
339 None
340 }
341 }
342 QueryMode::Delete(_) => None,
343 },
344 consistency: self.consistency,
345 };
346
347 validate_logical_plan_model(&schema_info, self.model, &plan)?;
348
349 Ok(plan)
350 }
351
352 fn validate_intent(&self) -> Result<(), IntentError> {
354 if self.key_access_conflict {
355 return Err(IntentError::KeyAccessConflict);
356 }
357
358 policy::validate_intent_plan_shape(self.mode, self.order.as_ref())
359 .map_err(IntentError::from)?;
360
361 if let Some(state) = &self.key_access {
362 match state.kind {
363 KeyAccessKind::Many if self.predicate.is_some() => {
364 return Err(IntentError::ByIdsWithPredicate);
365 }
366 KeyAccessKind::Only if self.predicate.is_some() => {
367 return Err(IntentError::OnlyWithPredicate);
368 }
369 _ => {
370 }
372 }
373 }
374
375 Ok(())
376 }
377}
378
379#[derive(Debug)]
391pub struct Query<E: EntityKind> {
392 intent: QueryModel<'static, E::Key>,
393 _marker: PhantomData<E>,
394}
395
396impl<E: EntityKind> Query<E> {
397 #[must_use]
401 pub const fn new(consistency: ReadConsistency) -> Self {
402 Self {
403 intent: QueryModel::new(E::MODEL, consistency),
404 _marker: PhantomData,
405 }
406 }
407
408 #[must_use]
410 pub const fn mode(&self) -> QueryMode {
411 self.intent.mode()
412 }
413
414 #[must_use]
415 pub(crate) fn has_explicit_order(&self) -> bool {
416 self.intent.has_explicit_order()
417 }
418
419 #[must_use]
420 pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
421 self.intent.load_spec()
422 }
423
424 #[must_use]
426 pub fn filter(mut self, predicate: Predicate) -> Self {
427 self.intent = self.intent.filter(predicate);
428 self
429 }
430
431 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
433 let Self { intent, _marker } = self;
434 let intent = intent.filter_expr(expr)?;
435
436 Ok(Self { intent, _marker })
437 }
438
439 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
441 let Self { intent, _marker } = self;
442 let intent = intent.sort_expr(expr)?;
443
444 Ok(Self { intent, _marker })
445 }
446
447 #[must_use]
449 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
450 self.intent = self.intent.order_by(field);
451 self
452 }
453
454 #[must_use]
456 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
457 self.intent = self.intent.order_by_desc(field);
458 self
459 }
460
461 #[must_use]
463 pub fn distinct(mut self) -> Self {
464 self.intent = self.intent.distinct();
465 self
466 }
467
468 pub(crate) fn by_id(self, id: E::Key) -> Self {
470 let Self { intent, _marker } = self;
471 Self {
472 intent: intent.by_id(id),
473 _marker,
474 }
475 }
476
477 pub(crate) fn by_ids<I>(self, ids: I) -> Self
479 where
480 I: IntoIterator<Item = E::Key>,
481 {
482 let Self { intent, _marker } = self;
483 Self {
484 intent: intent.by_ids(ids),
485 _marker,
486 }
487 }
488
489 #[must_use]
491 pub fn delete(mut self) -> Self {
492 self.intent = self.intent.delete();
493 self
494 }
495
496 #[must_use]
502 pub fn limit(mut self, limit: u32) -> Self {
503 self.intent = self.intent.limit(limit);
504 self
505 }
506
507 #[must_use]
511 pub fn offset(mut self, offset: u32) -> Self {
512 self.intent = self.intent.offset(offset);
513 self
514 }
515
516 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
518 let plan = self.build_plan()?;
519
520 Ok(plan.explain_with_model(E::MODEL))
521 }
522
523 pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
525 let plan = self.build_plan()?;
526 let predicate_slots = plan
527 .predicate
528 .as_ref()
529 .map(PredicateFieldSlots::resolve::<E>);
530
531 Ok(ExecutablePlan::new_with_compiled_predicate_slots(
532 plan,
533 predicate_slots,
534 ))
535 }
536
537 fn build_plan(&self) -> Result<LogicalPlan<E::Key>, QueryError> {
539 let plan_value = self.intent.build_plan_model()?;
540 let LogicalPlan {
541 mode,
542 access,
543 predicate,
544 order,
545 distinct,
546 delete_limit,
547 page,
548 consistency,
549 } = plan_value;
550
551 let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
552 let plan = LogicalPlan {
553 mode,
554 access,
555 predicate,
556 order,
557 distinct,
558 delete_limit,
559 page,
560 consistency,
561 };
562
563 Ok(plan)
564 }
565}
566
567impl<E> Query<E>
568where
569 E: EntityKind + SingletonEntity,
570 E::Key: Default,
571{
572 pub(crate) fn only(self) -> Self {
574 let Self { intent, _marker } = self;
575
576 Self {
577 intent: intent.only(E::Key::default()),
578 _marker,
579 }
580 }
581}
582
583#[derive(Debug, ThisError)]
588pub enum QueryError {
589 #[error("{0}")]
590 Validate(#[from] ValidateError),
591
592 #[error("{0}")]
593 Plan(Box<PlanError>),
594
595 #[error("{0}")]
596 Intent(#[from] IntentError),
597
598 #[error("{0}")]
599 Response(#[from] ResponseError),
600
601 #[error("{0}")]
602 Execute(#[from] InternalError),
603}
604
605impl From<PlannerError> for QueryError {
606 fn from(err: PlannerError) -> Self {
607 match err {
608 PlannerError::Plan(err) => Self::from(*err),
609 PlannerError::Internal(err) => Self::Execute(*err),
610 }
611 }
612}
613
614impl From<PlanError> for QueryError {
615 fn from(err: PlanError) -> Self {
616 Self::Plan(Box::new(err))
617 }
618}
619
620#[derive(Clone, Copy, Debug, ThisError)]
625pub enum IntentError {
626 #[error("{0}")]
627 PlanShape(#[from] policy::PlanPolicyError),
628
629 #[error("by_ids() cannot be combined with predicates")]
630 ByIdsWithPredicate,
631
632 #[error("only() cannot be combined with predicates")]
633 OnlyWithPredicate,
634
635 #[error("multiple key access methods were used on the same query")]
636 KeyAccessConflict,
637
638 #[error("cursor pagination requires an explicit ordering")]
639 CursorRequiresOrder,
640
641 #[error("cursor pagination requires an explicit limit")]
642 CursorRequiresLimit,
643
644 #[error("cursor tokens can only be used with .page().execute()")]
645 CursorRequiresPagedExecution,
646}
647
648impl From<policy::CursorPagingPolicyError> for IntentError {
649 fn from(err: policy::CursorPagingPolicyError) -> Self {
650 match err {
651 policy::CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
652 policy::CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
653 }
654 }
655}
656
657fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
659 match order {
660 Some(mut spec) => {
661 spec.fields.push((field.to_string(), direction));
662 spec
663 }
664 None => OrderSpec {
665 fields: vec![(field.to_string(), direction)],
666 },
667 }
668}
669
670fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
675 let mut order = order?;
676 if order.fields.is_empty() {
677 return Some(order);
678 }
679
680 let pk_field = model.primary_key.name;
681 let mut pk_direction = None;
682 order.fields.retain(|(field, direction)| {
683 if field == pk_field {
684 if pk_direction.is_none() {
685 pk_direction = Some(*direction);
686 }
687 false
688 } else {
689 true
690 }
691 });
692
693 let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
694 order.fields.push((pk_field.to_string(), pk_direction));
695
696 Some(order)
697}