1#![expect(clippy::used_underscore_binding)]
2#[cfg(test)]
3mod tests;
4
5mod key_access;
7pub(crate) use crate::db::intent::{DeleteSpec, LoadSpec, QueryMode};
8pub(crate) use key_access::*;
9
10use crate::{
11 db::{
12 consistency::ReadConsistency,
13 executor::ExecutablePlan,
14 policy,
15 query::{
16 explain::ExplainPlan,
17 expr::{FilterExpr, SortExpr, SortLowerError},
18 plan::{
19 AccessPlannedQuery, DeleteLimitSpec, LogicalPlan, OrderDirection, OrderSpec,
20 PageSpec, PlanError,
21 planner::{PlannerError, plan_access},
22 validate::validate_logical_plan_model,
23 },
24 predicate::{
25 Predicate, SchemaInfo, ValidateError, normalize, normalize_enum_literals,
26 validate::reject_unsupported_query_features,
27 },
28 },
29 response::ResponseError,
30 },
31 error::InternalError,
32 model::entity::EntityModel,
33 traits::{EntityKind, FieldValue, SingletonEntity},
34 value::Value,
35};
36use std::marker::PhantomData;
37use thiserror::Error as ThisError;
38
39#[derive(Debug)]
47pub(crate) struct QueryModel<'m, K> {
48 model: &'m EntityModel,
49 mode: QueryMode,
50 predicate: Option<Predicate>,
51 key_access: Option<KeyAccessState<K>>,
52 key_access_conflict: bool,
53 order: Option<OrderSpec>,
54 distinct: bool,
55 consistency: ReadConsistency,
56}
57
58impl<'m, K: FieldValue> QueryModel<'m, K> {
59 #[must_use]
60 pub(crate) const fn new(model: &'m EntityModel, consistency: ReadConsistency) -> Self {
61 Self {
62 model,
63 mode: QueryMode::Load(LoadSpec::new()),
64 predicate: None,
65 key_access: None,
66 key_access_conflict: false,
67 order: None,
68 distinct: false,
69 consistency,
70 }
71 }
72
73 #[must_use]
75 pub(crate) const fn mode(&self) -> QueryMode {
76 self.mode
77 }
78
79 #[must_use]
80 fn has_explicit_order(&self) -> bool {
81 policy::has_explicit_order(self.order.as_ref())
82 }
83
84 #[must_use]
85 const fn load_spec(&self) -> Option<LoadSpec> {
86 match self.mode {
87 QueryMode::Load(spec) => Some(spec),
88 QueryMode::Delete(_) => None,
89 }
90 }
91
92 #[must_use]
94 pub(crate) fn filter(mut self, predicate: Predicate) -> Self {
95 self.predicate = match self.predicate.take() {
96 Some(existing) => Some(Predicate::And(vec![existing, predicate])),
97 None => Some(predicate),
98 };
99 self
100 }
101
102 pub(crate) fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
104 let schema = SchemaInfo::from_entity_model(self.model)?;
105 let predicate = expr.lower_with(&schema).map_err(QueryError::Validate)?;
106
107 Ok(self.filter(predicate))
108 }
109
110 pub(crate) fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
112 let schema = SchemaInfo::from_entity_model(self.model)?;
113 let order = match expr.lower_with(&schema) {
114 Ok(order) => order,
115 Err(SortLowerError::Validate(err)) => return Err(QueryError::Validate(err)),
116 Err(SortLowerError::Plan(err)) => return Err(QueryError::from(*err)),
117 };
118
119 policy::validate_order_shape(Some(&order))
120 .map_err(IntentError::from)
121 .map_err(QueryError::from)?;
122
123 Ok(self.order_spec(order))
124 }
125
126 #[must_use]
128 pub(crate) fn order_by(mut self, field: impl AsRef<str>) -> Self {
129 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Asc));
130 self
131 }
132
133 #[must_use]
135 pub(crate) fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
136 self.order = Some(push_order(self.order, field.as_ref(), OrderDirection::Desc));
137 self
138 }
139
140 pub(crate) fn order_spec(mut self, order: OrderSpec) -> Self {
142 self.order = Some(order);
143 self
144 }
145
146 #[must_use]
148 pub(crate) const fn distinct(mut self) -> Self {
149 self.distinct = true;
150 self
151 }
152
153 fn set_key_access(mut self, kind: KeyAccessKind, access: KeyAccess<K>) -> Self {
155 if let Some(existing) = &self.key_access
156 && existing.kind != kind
157 {
158 self.key_access_conflict = true;
159 }
160
161 self.key_access = Some(KeyAccessState { kind, access });
162
163 self
164 }
165
166 pub(crate) fn by_id(self, id: K) -> Self {
168 self.set_key_access(KeyAccessKind::Single, KeyAccess::Single(id))
169 }
170
171 pub(crate) fn by_ids<I>(self, ids: I) -> Self
173 where
174 I: IntoIterator<Item = K>,
175 {
176 self.set_key_access(
177 KeyAccessKind::Many,
178 KeyAccess::Many(ids.into_iter().collect()),
179 )
180 }
181
182 pub(crate) fn only(self, id: K) -> Self {
184 self.set_key_access(KeyAccessKind::Only, KeyAccess::Single(id))
185 }
186
187 #[must_use]
189 pub(crate) const fn delete(mut self) -> Self {
190 if self.mode.is_load() {
191 self.mode = QueryMode::Delete(DeleteSpec::new());
192 }
193 self
194 }
195
196 #[must_use]
200 pub(crate) const fn limit(mut self, limit: u32) -> Self {
201 match self.mode {
202 QueryMode::Load(mut spec) => {
203 spec.limit = Some(limit);
204 self.mode = QueryMode::Load(spec);
205 }
206 QueryMode::Delete(mut spec) => {
207 spec.limit = Some(limit);
208 self.mode = QueryMode::Delete(spec);
209 }
210 }
211 self
212 }
213
214 #[must_use]
216 pub(crate) const fn offset(mut self, offset: u32) -> Self {
217 if let QueryMode::Load(mut spec) = self.mode {
218 spec.offset = offset;
219 self.mode = QueryMode::Load(spec);
220 }
221 self
222 }
223
224 fn build_plan_model(&self) -> Result<AccessPlannedQuery<Value>, QueryError> {
226 let schema_info = SchemaInfo::from_entity_model(self.model)?;
228 self.validate_intent()?;
229
230 let normalized_predicate = self
232 .predicate
233 .as_ref()
234 .map(|predicate| {
235 reject_unsupported_query_features(predicate).map_err(ValidateError::from)?;
236 let predicate = normalize_enum_literals(&schema_info, predicate)?;
237 Ok::<Predicate, ValidateError>(normalize(&predicate))
238 })
239 .transpose()?;
240 let access_plan_value = match &self.key_access {
241 Some(state) => access_plan_from_keys_value(&state.access),
242 None => plan_access(self.model, &schema_info, normalized_predicate.as_ref())?,
243 };
244
245 let logical = LogicalPlan {
247 mode: self.mode,
248 predicate: normalized_predicate,
249 order: canonicalize_order_spec(self.model, self.order.clone()),
252 distinct: self.distinct,
253 delete_limit: match self.mode {
254 QueryMode::Delete(spec) => spec.limit.map(|max_rows| DeleteLimitSpec { max_rows }),
255 QueryMode::Load(_) => None,
256 },
257 page: match self.mode {
258 QueryMode::Load(spec) => {
259 if spec.limit.is_some() || spec.offset > 0 {
260 Some(PageSpec {
261 limit: spec.limit,
262 offset: spec.offset,
263 })
264 } else {
265 None
266 }
267 }
268 QueryMode::Delete(_) => None,
269 },
270 consistency: self.consistency,
271 };
272 let plan = AccessPlannedQuery::from_parts(logical, access_plan_value);
273
274 validate_logical_plan_model(&schema_info, self.model, &plan)?;
275
276 Ok(plan)
277 }
278
279 fn validate_intent(&self) -> Result<(), IntentError> {
281 if self.key_access_conflict {
282 return Err(IntentError::KeyAccessConflict);
283 }
284
285 policy::validate_intent_plan_shape(self.mode, self.order.as_ref())
286 .map_err(IntentError::from)?;
287
288 if let Some(state) = &self.key_access {
289 match state.kind {
290 KeyAccessKind::Many if self.predicate.is_some() => {
291 return Err(IntentError::ByIdsWithPredicate);
292 }
293 KeyAccessKind::Only if self.predicate.is_some() => {
294 return Err(IntentError::OnlyWithPredicate);
295 }
296 _ => {
297 }
299 }
300 }
301
302 Ok(())
303 }
304}
305
306#[derive(Debug)]
324pub struct PlannedQuery<E: EntityKind> {
325 plan: AccessPlannedQuery<E::Key>,
326 _marker: PhantomData<E>,
327}
328
329impl<E: EntityKind> PlannedQuery<E> {
330 #[must_use]
331 pub(in crate::db) const fn new(plan: AccessPlannedQuery<E::Key>) -> Self {
332 Self {
333 plan,
334 _marker: PhantomData,
335 }
336 }
337
338 #[must_use]
339 pub fn explain(&self) -> ExplainPlan {
340 self.plan.explain_with_model(E::MODEL)
341 }
342
343 #[must_use]
344 pub(in crate::db) fn into_inner(self) -> AccessPlannedQuery<E::Key> {
345 self.plan
346 }
347}
348
349impl<E: EntityKind> From<PlannedQuery<E>> for ExecutablePlan<E> {
350 fn from(value: PlannedQuery<E>) -> Self {
351 Self::new(value.into_inner())
352 }
353}
354
355#[derive(Debug)]
356pub struct Query<E: EntityKind> {
357 intent: QueryModel<'static, E::Key>,
358 _marker: PhantomData<E>,
359}
360
361impl<E: EntityKind> Query<E> {
362 #[must_use]
366 pub const fn new(consistency: ReadConsistency) -> Self {
367 Self {
368 intent: QueryModel::new(E::MODEL, consistency),
369 _marker: PhantomData,
370 }
371 }
372
373 #[must_use]
375 pub const fn mode(&self) -> QueryMode {
376 self.intent.mode()
377 }
378
379 #[must_use]
380 pub(crate) fn has_explicit_order(&self) -> bool {
381 self.intent.has_explicit_order()
382 }
383
384 #[must_use]
385 pub(crate) const fn load_spec(&self) -> Option<LoadSpec> {
386 self.intent.load_spec()
387 }
388
389 #[must_use]
391 pub fn filter(mut self, predicate: Predicate) -> Self {
392 self.intent = self.intent.filter(predicate);
393 self
394 }
395
396 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
398 let Self { intent, _marker } = self;
399 let intent = intent.filter_expr(expr)?;
400
401 Ok(Self { intent, _marker })
402 }
403
404 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
406 let Self { intent, _marker } = self;
407 let intent = intent.sort_expr(expr)?;
408
409 Ok(Self { intent, _marker })
410 }
411
412 #[must_use]
414 pub fn order_by(mut self, field: impl AsRef<str>) -> Self {
415 self.intent = self.intent.order_by(field);
416 self
417 }
418
419 #[must_use]
421 pub fn order_by_desc(mut self, field: impl AsRef<str>) -> Self {
422 self.intent = self.intent.order_by_desc(field);
423 self
424 }
425
426 #[must_use]
428 pub fn distinct(mut self) -> Self {
429 self.intent = self.intent.distinct();
430 self
431 }
432
433 pub(crate) fn by_id(self, id: E::Key) -> Self {
435 let Self { intent, _marker } = self;
436 Self {
437 intent: intent.by_id(id),
438 _marker,
439 }
440 }
441
442 pub(crate) fn by_ids<I>(self, ids: I) -> Self
444 where
445 I: IntoIterator<Item = E::Key>,
446 {
447 let Self { intent, _marker } = self;
448 Self {
449 intent: intent.by_ids(ids),
450 _marker,
451 }
452 }
453
454 #[must_use]
456 pub fn delete(mut self) -> Self {
457 self.intent = self.intent.delete();
458 self
459 }
460
461 #[must_use]
467 pub fn limit(mut self, limit: u32) -> Self {
468 self.intent = self.intent.limit(limit);
469 self
470 }
471
472 #[must_use]
476 pub fn offset(mut self, offset: u32) -> Self {
477 self.intent = self.intent.offset(offset);
478 self
479 }
480
481 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
483 let plan = self.planned()?;
484
485 Ok(plan.explain())
486 }
487
488 pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
490 let plan = self.build_plan()?;
491
492 Ok(PlannedQuery::new(plan))
493 }
494
495 pub fn plan(&self) -> Result<ExecutablePlan<E>, QueryError> {
497 self.planned().map(ExecutablePlan::from)
498 }
499
500 fn build_plan(&self) -> Result<AccessPlannedQuery<E::Key>, QueryError> {
502 let plan_value = self.intent.build_plan_model()?;
503 let (logical, access) = plan_value.into_parts();
504 let access = access_plan_to_entity_keys::<E>(E::MODEL, access)?;
505 let plan = AccessPlannedQuery::from_parts(logical, access);
506
507 Ok(plan)
508 }
509}
510
511impl<E> Query<E>
512where
513 E: EntityKind + SingletonEntity,
514 E::Key: Default,
515{
516 pub(crate) fn only(self) -> Self {
518 let Self { intent, _marker } = self;
519
520 Self {
521 intent: intent.only(E::Key::default()),
522 _marker,
523 }
524 }
525}
526
527#[derive(Debug, ThisError)]
532pub enum QueryError {
533 #[error("{0}")]
534 Validate(#[from] ValidateError),
535
536 #[error("{0}")]
537 Plan(Box<PlanError>),
538
539 #[error("{0}")]
540 Intent(#[from] IntentError),
541
542 #[error("{0}")]
543 Response(#[from] ResponseError),
544
545 #[error("{0}")]
546 Execute(#[from] InternalError),
547}
548
549impl From<PlannerError> for QueryError {
550 fn from(err: PlannerError) -> Self {
551 match err {
552 PlannerError::Plan(err) => Self::from(*err),
553 PlannerError::Internal(err) => Self::Execute(*err),
554 }
555 }
556}
557
558impl From<PlanError> for QueryError {
559 fn from(err: PlanError) -> Self {
560 Self::Plan(Box::new(err))
561 }
562}
563
564#[derive(Clone, Copy, Debug, ThisError)]
569pub enum IntentError {
570 #[error("{0}")]
571 PlanShape(#[from] policy::PlanPolicyError),
572
573 #[error("by_ids() cannot be combined with predicates")]
574 ByIdsWithPredicate,
575
576 #[error("only() cannot be combined with predicates")]
577 OnlyWithPredicate,
578
579 #[error("multiple key access methods were used on the same query")]
580 KeyAccessConflict,
581
582 #[error("cursor pagination requires an explicit ordering")]
583 CursorRequiresOrder,
584
585 #[error("cursor pagination requires an explicit limit")]
586 CursorRequiresLimit,
587
588 #[error("cursor tokens can only be used with .page().execute()")]
589 CursorRequiresPagedExecution,
590}
591
592impl From<policy::CursorPagingPolicyError> for IntentError {
593 fn from(err: policy::CursorPagingPolicyError) -> Self {
594 match err {
595 policy::CursorPagingPolicyError::CursorRequiresOrder => Self::CursorRequiresOrder,
596 policy::CursorPagingPolicyError::CursorRequiresLimit => Self::CursorRequiresLimit,
597 }
598 }
599}
600
601fn push_order(order: Option<OrderSpec>, field: &str, direction: OrderDirection) -> OrderSpec {
603 match order {
604 Some(mut spec) => {
605 spec.fields.push((field.to_string(), direction));
606 spec
607 }
608 None => OrderSpec {
609 fields: vec![(field.to_string(), direction)],
610 },
611 }
612}
613
614fn canonicalize_order_spec(model: &EntityModel, order: Option<OrderSpec>) -> Option<OrderSpec> {
619 let mut order = order?;
620 if order.fields.is_empty() {
621 return Some(order);
622 }
623
624 let pk_field = model.primary_key.name;
625 let mut pk_direction = None;
626 order.fields.retain(|(field, direction)| {
627 if field == pk_field {
628 if pk_direction.is_none() {
629 pk_direction = Some(*direction);
630 }
631 false
632 } else {
633 true
634 }
635 });
636
637 let pk_direction = pk_direction.unwrap_or(OrderDirection::Asc);
638 order.fields.push((pk_field.to_string(), pk_direction));
639
640 Some(order)
641}