1#![expect(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5mod key_access;
7pub(crate) use key_access::*;
8
9use crate::{
10 db::{
11 query::{
12 ReadConsistency,
13 expr::{FilterExpr, SortExpr, SortLowerError},
14 plan::{
15 DeleteLimitSpec, ExecutablePlan, ExplainPlan, LogicalPlan, OrderDirection,
16 OrderSpec, PageSpec, PlanError,
17 planner::{PlannerError, plan_access},
18 validate::validate_logical_plan_model,
19 },
20 policy,
21 predicate::{
22 Predicate, PredicateFieldSlots, SchemaInfo, ValidateError, normalize,
23 normalize_enum_literals, validate::reject_unsupported_query_features,
24 },
25 },
26 response::ResponseError,
27 },
28 error::InternalError,
29 model::entity::EntityModel,
30 traits::{EntityKind, FieldValue, SingletonEntity},
31 value::Value,
32};
33use std::marker::PhantomData;
34use thiserror::Error as ThisError;
35
36#[derive(Clone, Copy, Debug, Eq, PartialEq)]
44pub enum QueryMode {
45 Load(LoadSpec),
46 Delete(DeleteSpec),
47}
48
49impl QueryMode {
50 #[must_use]
52 pub const fn is_load(&self) -> bool {
53 match self {
54 Self::Load(_) => true,
55 Self::Delete(_) => false,
56 }
57 }
58
59 #[must_use]
61 pub const fn is_delete(&self) -> bool {
62 match self {
63 Self::Delete(_) => true,
64 Self::Load(_) => false,
65 }
66 }
67}
68
69#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
76pub struct LoadSpec {
77 pub limit: Option<u32>,
78 pub offset: u32,
79}
80
81impl LoadSpec {
82 #[must_use]
84 pub const fn new() -> Self {
85 Self {
86 limit: None,
87 offset: 0,
88 }
89 }
90}
91
92#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
99pub struct DeleteSpec {
100 pub limit: Option<u32>,
101}
102
103impl DeleteSpec {
104 #[must_use]
106 pub const fn new() -> Self {
107 Self { limit: None }
108 }
109}
110
111#[derive(Debug)]
119pub(crate) struct QueryModel<'m, K> {
120 model: &'m EntityModel,
121 mode: QueryMode,
122 predicate: Option<Predicate>,
123 key_access: Option<KeyAccessState<K>>,
124 key_access_conflict: bool,
125 order: Option<OrderSpec>,
126 distinct: bool,
127 consistency: ReadConsistency,
128}
129
130impl<'m, K: FieldValue> QueryModel<'m, K> {
131 #[must_use]
132 pub(crate) const fn new(model: &'m EntityModel, consistency: ReadConsistency) -> Self {
133 Self {
134 model,
135 mode: QueryMode::Load(LoadSpec::new()),
136 predicate: None,
137 key_access: None,
138 key_access_conflict: false,
139 order: None,
140 distinct: false,
141 consistency,
142 }
143 }
144
145 #[must_use]
147 pub(crate) const fn mode(&self) -> QueryMode {
148 self.mode
149 }
150
151 #[must_use]
152 fn has_explicit_order(&self) -> bool {
153 policy::has_explicit_order(self.order.as_ref())
154 }
155
156 #[must_use]
157 const fn load_spec(&self) -> Option<LoadSpec> {
158 match self.mode {
159 QueryMode::Load(spec) => Some(spec),
160 QueryMode::Delete(_) => None,
161 }
162 }
163
164 #[must_use]
166 pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
167 self.predicate = match self.predicate.take() {
168 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
169 None => Some(predicate),
170 };
171 self
172 }
173
174 pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
176 let schema = SchemaInfo::from_entity_model(self.model)?;
177 let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
178
179 Ok(self.filter(predicate))
180 }
181
182 pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
184 let schema = SchemaInfo::from_entity_model(self.model)?;
185 let order = match expr.lower_with(&schema) {
186 Ok(order) => order,
187 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
188 Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
189 };
190
191 policy::validate_order_shape(Some(&order))
192 .map_err(IntentError::from)
193 .map_err(QueryError::from)?;
194
195 Ok(self.order_spec(order))
196 }
197
198 #[must_use]
200 pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
201 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
202 self
203 }
204
205 #[must_use]
207 pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
208 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
209 self
210 }
211
212 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
214 self.order = Some(order);
215 self
216 }
217
218 #[must_use]
220 pub(crate) const fn distinct(mut self) -> Self {
221 self.distinct = true;
222 self
223 }
224
225 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
227 if let Some(existing) = &self.key_access
228 && existing.kind != kind
229 {
230 self.key_access_conflict = true;
231 }
232
233 self.key_access = Some(KeyAccessState { kind, access });
234
235 self
236 }
237
238 pub(crate) fn by_id(self, id: K) -> Self {
240 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
241 }
242
243 pub(crate) fn by_ids<I>(self, ids: I) -> Self
245 where
246 I: IntoIterator<Item = K>,
247 {
248 self.set_key_access(
249 KeyAccessKind::Many,
250 KeyAccess::Many(ids.into_iter().collect()),
251 )
252 }
253
254 pub(crate) fn only(self, id: K) -> Self {
256 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
257 }
258
259 #[must_use]
261 pub(crate) const fn delete(mut self) -> Self {
262 if self.mode.is_load() {
263 self.mode = QueryMode::Delete(DeleteSpec::new());
264 }
265 self
266 }
267
268 #[must_use]
272 pub(crate) const fn limit(mut self, limit: u32) -> Self {
273 match self.mode {
274 QueryMode::Load(mut spec) => {
275 spec.limit = Some(limit);
276 self.mode = QueryMode::Load(spec);
277 }
278 QueryMode::Delete(mut spec) => {
279 spec.limit = Some(limit);
280 self.mode = QueryMode::Delete(spec);
281 }
282 }
283 self
284 }
285
286 #[must_use]
288 pub(crate) const fn offset(mut self, offset: u32) -> Self {
289 if let QueryMode::Load(mut spec) = self.mode {
290 spec.offset = offset;
291 self.mode = QueryMode::Load(spec);
292 }
293 self
294 }
295
296 fn build_plan_model(&self) -> Result<LogicalPlan<Value>, QueryError> {
298 let schema_info = SchemaInfo::from_entity_model(self.model)?;
300 self.validate_intent()?;
301
302 let normalized_predicate = self
304 .predicate
305 .as_ref()
306 .map(|predicate| {
307 reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
308 let predicate = normalize_enum_literals(&schema_info, predicate)?;
309 Ok::<Predicate, ValidateError>(normalize(&predicate))
310 })
311 .transpose()?;
312 let access_plan_value = match &self.key_access {
313 Some(state) => access_plan_from_keys_value(&state.access),
314 None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
315 };
316
317 let plan = LogicalPlan {
319 mode: self.mode,
320 access: access_plan_value,
321 predicate: normalized_predicate,
322 order: canonicalize_order_spec(self.model, self.order.clone()),
325 distinct: self.distinct,
326 delete_limit: match self.mode {
327 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
328 QueryMode::Load(_) => None,
329 },
330 page: match self.mode {
331 QueryMode::Load(spec) => {
332 if spec.limit.is_some() || spec.offset > 0 {
333 Some(PageSpec {
334 limit: spec.limit,
335 offset: spec.offset,
336 })
337 } else {
338 None
339 }
340 }
341 QueryMode::Delete(_) => None,
342 },
343 consistency: self.consistency,
344 };
345
346 validate_logical_plan_model(&schema_info, self.model, &plan)?;
347
348 Ok(plan)
349 }
350
351 fn validate_intent(&self) -> Result<(), IntentError> {
353 if self.key_access_conflict {
354 return Err(IntentError::KeyAccessConflict);
355 }
356
357 policy::validate_intent_plan_shape(self.mode, self.order.as_ref())
358 .map_err(IntentError::from)?;
359
360 if let Some(state) = &self.key_access {
361 match state.kind {
362 KeyAccessKind::Many if self.predicate.is_some() => {
363 return Err(IntentError::ByIdsWithPredicate);
364 }
365 KeyAccessKind::Only if self.predicate.is_some() => {
366 return Err(IntentError::OnlyWithPredicate);
367 }
368 _ => {
369 }
371 }
372 }
373
374 Ok(())
375 }
376}
377
378#[derive(Debug)]
390pub struct Query<E: EntityKind> {
391 intent: QueryModel<'static, E::Key>,
392 _marker: PhantomData<E>,
393}
394
395impl<E: EntityKind> Query<E> {
396 #[must_use]
400 pub const fn new(consistency: ReadConsistency) -> Self {
401 Self {
402 intent: QueryModel::new(E::MODEL, consistency),
403 _marker: PhantomData,
404 }
405 }
406
407 #[must_use]
409 pub const fn mode(&self) -> QueryMode {
410 self.intent.mode()
411 }
412
413 #[must_use]
414 pub(crate) fn has_explicit_order(&self) -> bool {
415 self.intent.has_explicit_order()
416 }
417
418 #[must_use]
419 pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
420 self.intent.load_spec()
421 }
422
423 #[must_use]
425 pub fn filter(mut self, predicate: Predicate) -> Self {
426 self.intent = self.intent.filter(predicate);
427 self
428 }
429
430 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
432 let Self { intent, _marker } = self;
433 let intent = intent.filter_expr(expr)?;
434
435 Ok(Self { intent, _marker })
436 }
437
438 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
440 let Self { intent, _marker } = self;
441 let intent = intent.sort_expr(expr)?;
442
443 Ok(Self { intent, _marker })
444 }
445
446 #[must_use]
448 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
449 self.intent = self.intent.order_by(field);
450 self
451 }
452
453 #[must_use]
455 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
456 self.intent = self.intent.order_by_desc(field);
457 self
458 }
459
460 #[must_use]
462 pub fn distinct(mut self) -> Self {
463 self.intent = self.intent.distinct();
464 self
465 }
466
467 pub(crate) fn by_id(self, id: E::Key) -> Self {
469 let Self { intent, _marker } = self;
470 Self {
471 intent: intent.by_id(id),
472 _marker,
473 }
474 }
475
476 pub(crate) fn by_ids<I>(self, ids: I) -> Self
478 where
479 I: IntoIterator<Item = E::Key>,
480 {
481 let Self { intent, _marker } = self;
482 Self {
483 intent: intent.by_ids(ids),
484 _marker,
485 }
486 }
487
488 #[must_use]
490 pub fn delete(mut self) -> Self {
491 self.intent = self.intent.delete();
492 self
493 }
494
495 #[must_use]
501 pub fn limit(mut self, limit: u32) -> Self {
502 self.intent = self.intent.limit(limit);
503 self
504 }
505
506 #[must_use]
510 pub fn offset(mut self, offset: u32) -> Self {
511 self.intent = self.intent.offset(offset);
512 self
513 }
514
515 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
517 let plan = self.build_plan()?;
518
519 Ok(plan.explain_with_model(E::MODEL))
520 }
521
522 pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
524 let plan = self.build_plan()?;
525 let predicate_slots = plan
526 .predicate
527 .as_ref()
528 .map(PredicateFieldSlots::resolve::<E>);
529
530 Ok(ExecutablePlan::new_with_compiled_predicate_slots(
531 plan,
532 predicate_slots,
533 ))
534 }
535
536 fn build_plan(&self) -> Result<LogicalPlan<E::Key>, QueryError> {
538 let plan_value = self.intent.build_plan_model()?;
539 let LogicalPlan {
540 mode,
541 access,
542 predicate,
543 order,
544 distinct,
545 delete_limit,
546 page,
547 consistency,
548 } = plan_value;
549
550 let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
551 let plan = LogicalPlan {
552 mode,
553 access,
554 predicate,
555 order,
556 distinct,
557 delete_limit,
558 page,
559 consistency,
560 };
561
562 Ok(plan)
563 }
564}
565
566impl<E> Query<E>
567where
568 E: EntityKind + SingletonEntity,
569 E::Key: Default,
570{
571 pub(crate) fn only(self) -> Self {
573 let Self { intent, _marker } = self;
574
575 Self {
576 intent: intent.only(E::Key::default()),
577 _marker,
578 }
579 }
580}
581
582#[derive(Debug, ThisError)]
587pub enum QueryError {
588 #[error("{0}")]
589 Validate(#[from] ValidateError),
590
591 #[error("{0}")]
592 Plan(Box<PlanError>),
593
594 #[error("{0}")]
595 Intent(#[from] IntentError),
596
597 #[error("{0}")]
598 Response(#[from] ResponseError),
599
600 #[error("{0}")]
601 Execute(#[from] InternalError),
602}
603
604impl From<PlannerError> for QueryError {
605 fn from(err: PlannerError) -> Self {
606 match err {
607 PlannerError::Plan(err) => Self::from(*err),
608 PlannerError::Internal(err) => Self::Execute(*err),
609 }
610 }
611}
612
613impl From<PlanError> for QueryError {
614 fn from(err: PlanError) -> Self {
615 Self::Plan(Box::new(err))
616 }
617}
618
619#[derive(Clone, Copy, Debug, ThisError)]
624pub enum IntentError {
625 #[error("{0}")]
626 PlanShape(#[from] policy::PlanPolicyError),
627
628 #[error("by_ids() cannot be combined with predicates")]
629 ByIdsWithPredicate,
630
631 #[error("only() cannot be combined with predicates")]
632 OnlyWithPredicate,
633
634 #[error("multiple key access methods were used on the same query")]
635 KeyAccessConflict,
636
637 #[error("cursor pagination requires an explicit ordering")]
638 CursorRequiresOrder,
639
640 #[error("cursor pagination requires an explicit limit")]
641 CursorRequiresLimit,
642
643 #[error("cursor tokens can only be used with .page().execute()")]
644 CursorRequiresPagedExecution,
645}
646
647impl From<policy::CursorPagingPolicyError> for IntentError {
648 fn from(err: policy::CursorPagingPolicyError) -> Self {
649 match err {
650 policy::CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
651 policy::CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
652 }
653 }
654}
655
656fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
658 match order {
659 Some(mut spec) => {
660 spec.fields.push((field.to_string(), direction));
661 spec
662 }
663 None => OrderSpec {
664 fields: vec![(field.to_string(), direction)],
665 },
666 }
667}
668
669fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
674 let mut order = order?;
675 if order.fields.is_empty() {
676 return Some(order);
677 }
678
679 let pk_field = model.primary_key.name;
680 let mut pk_direction = None;
681 order.fields.retain(|(field, direction)| {
682 if field == pk_field {
683 if pk_direction.is_none() {
684 pk_direction = Some(*direction);
685 }
686 false
687 } else {
688 true
689 }
690 });
691
692 let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
693 order.fields.push((pk_field.to_string(), pk_direction));
694
695 Some(order)
696}